You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by yu...@apache.org on 2015/11/22 07:42:39 UTC

incubator-reef git commit: [REEF-505] Callback features for VortexFuture

Repository: incubator-reef
Updated Branches:
  refs/heads/master 98dce5735 -> 8c934243f


[REEF-505] Callback features for VortexFuture

This addressed the issue by
  * Implementing callbacks on successful VortexTasklet completion.
  * Adding unit tests that consider result verification via the callback.

JIRA:
  [REEF-505](https://issues.apache.org/jira/browse/REEF-505)

Pull Request:
  This closes #665


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8c934243
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8c934243
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8c934243

Branch: refs/heads/master
Commit: 8c934243f605c7b19e203423e5d3e0a397905f31
Parents: 98dce57
Author: Andrew Chung <af...@gmail.com>
Authored: Thu Nov 19 23:06:25 2015 -0800
Committer: Yunseong Lee <yu...@apache.org>
Committed: Sun Nov 22 14:39:53 2015 +0800

----------------------------------------------------------------------
 .../apache/reef/vortex/api/VortexFuture.java    | 21 +++++-
 .../reef/vortex/api/VortexThreadPool.java       | 18 ++++-
 .../reef/vortex/driver/DefaultVortexMaster.java | 12 +++-
 .../apache/reef/vortex/driver/VortexMaster.java |  7 +-
 .../vortex/driver/DefaultVortexMasterTest.java  | 32 +++++++--
 .../org/apache/reef/vortex/driver/TestUtil.java | 12 ++++
 .../vortex/addone/AddOneCallbackTestStart.java  | 76 ++++++++++++++++++++
 .../applications/vortex/addone/AddOneTest.java  | 12 ++++
 8 files changed, 177 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index e855279..26ff509 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -19,18 +19,34 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
 
 import java.util.concurrent.*;
 
 /**
  * The interface between user code and submitted task.
- * TODO[REEF-505]: Callback features for VortexFuture.
  */
 @Unstable
 public final class VortexFuture<TOutput> implements Future<TOutput> {
   private TOutput userResult;
   private Exception userException;
   private final CountDownLatch countDownLatch = new CountDownLatch(1);
+  private final ThreadPoolStage<TOutput> stage;
+
+  /**
+   * Creates a {@link VortexFuture}.
+   */
+  public VortexFuture() {
+    stage = null;
+  }
+
+  /**
+   * Creates a {@link VortexFuture} with a callback.
+   */
+  public VortexFuture(final EventHandler<TOutput> callbackHandler) {
+    stage = new ThreadPoolStage<>(callbackHandler, 1);
+  }
 
   /**
    * TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
@@ -93,6 +109,9 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
    */
   public void completed(final TOutput result) {
     this.userResult = result;
+    if (stage != null) {
+      stage.onNext(userResult);
+    }
     this.countDownLatch.countDown();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index 25c5c90..221f852 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -19,7 +19,9 @@
 package org.apache.reef.vortex.api;
 
 import org.apache.reef.annotations.Unstable;
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.driver.VortexMaster;
+import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.io.Serializable;
@@ -45,6 +47,20 @@ public final class VortexThreadPool {
    */
   public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
       submit(final VortexFunction<TInput, TOutput> function, final TInput input) {
-    return vortexMaster.enqueueTasklet(function, input);
+    return vortexMaster.enqueueTasklet(function, input, Optional.<EventHandler<TOutput>>empty());
+  }
+
+  /**
+   * @param function to run on Vortex
+   * @param input of the function
+   * @param callback of the function
+   * @param <TInput> input type
+   * @param <TOutput> output type
+   * @return VortexFuture for tracking execution progress
+   */
+  public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
+      submit(final VortexFunction<TInput, TOutput> function, final TInput input,
+             final EventHandler<TOutput> callback) {
+    return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 206f7e3..a2b6161 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
 
 import javax.inject.Inject;
 import java.io.Serializable;
@@ -55,9 +56,16 @@ final class DefaultVortexMaster implements VortexMaster {
    */
   @Override
   public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
-      enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input) {
+      enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input,
+                     final Optional<EventHandler<TOutput>> callback) {
     // TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
-    final VortexFuture<TOutput> vortexFuture = new VortexFuture<>();
+    final VortexFuture<TOutput> vortexFuture;
+    if (callback.isPresent()) {
+      vortexFuture = new VortexFuture<>(callback.get());
+    } else {
+      vortexFuture = new VortexFuture<>();
+    }
+
     this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));
     return vortexFuture;
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index c9da30d..c7c3e79 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -21,8 +21,10 @@ package org.apache.reef.vortex.driver;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
 
 import java.io.Serializable;
 
@@ -35,10 +37,11 @@ import java.io.Serializable;
 @DefaultImplementation(DefaultVortexMaster.class)
 public interface VortexMaster {
   /**
-   * Submit a new Tasklet to be run sometime in the future.
+   * Submit a new Tasklet to be run sometime in the future, with an optional callback function on the result.
    */
   <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
-      enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input);
+      enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input,
+                     final Optional<EventHandler<TOutput>> callback);
 
   /**
    * Call this when a new worker is up and running.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index c5c0b30..ff3ae92 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -18,13 +18,15 @@
  */
 package org.apache.reef.vortex.driver;
 
+import org.apache.reef.util.Optional;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.*;
 
@@ -39,20 +41,34 @@ public class DefaultVortexMasterTest {
    */
   @Test(timeout = 10000)
   public void testSingleTaskletNoFailure() throws Exception {
-    final VortexFunction vortexFunction = testUtil.newFunction();
+    final VortexFunction vortexFunction = testUtil.newIntegerFunction();
     final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
     final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
     final PendingTasklets pendingTasklets = new PendingTasklets();
     final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets);
 
+    final AtomicBoolean callbackReceived = new AtomicBoolean(false);
+    final CountDownLatch latch = new CountDownLatch(1);
+
     vortexMaster.workerAllocated(vortexWorkerManager1);
-    final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null);
+
+    final EventHandler<Integer> testCallbackHandler = new EventHandler<Integer>() {
+      @Override
+      public void onNext(final Integer value) {
+        callbackReceived.set(true);
+        latch.countDown();
+      }};
+
+    final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
+
     final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
     for (final int taskletId : taskletIds) {
       vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null);
     }
 
     assertTrue("The VortexFuture should be done", future.isDone());
+    latch.await();
+    assertTrue("Callback should have been received", callbackReceived.get());
   }
 
   /**
@@ -69,7 +85,8 @@ public class DefaultVortexMasterTest {
 
     // Allocate worker & tasklet and schedule
     vortexMaster.workerAllocated(vortexWorkerManager1);
-    final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null);
+    final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null,
+        Optional.<EventHandler<Integer>>empty());
     final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, 1);
 
     // Preemption!
@@ -111,7 +128,8 @@ public class DefaultVortexMasterTest {
     // Schedule tasklets
     final int numOfTasklets = 100;
     for (int i = 0; i < numOfTasklets; i++) {
-      vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null));
+      vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null,
+          Optional.<EventHandler<Integer>>empty()));
     }
     final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets);
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 5a7da97..80ee597 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -63,4 +63,16 @@ public class TestUtil {
       }
     };
   }
+
+  /**
+   * @return a dummy integer-integer function.
+   */
+  public VortexFunction<Integer, Integer> newIntegerFunction() {
+    return new VortexFunction<Integer, Integer>() {
+      @Override
+      public Integer call(final Integer integer) throws Exception {
+        return 1;
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
new file mode 100644
index 0000000..3ab4f5f
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.addone;
+
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.apache.reef.wake.EventHandler;
+import org.junit.Assert;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test correctness of a simple vector calculation on Vortex, checking results with callbacks.
+ */
+public final class AddOneCallbackTestStart implements VortexStart {
+  @Inject
+  private AddOneCallbackTestStart() {
+  }
+
+  /**
+   * Test correctness of a simple vector calculation on Vortex, checking results with callbacks.
+   */
+  @Override
+  public void start(final VortexThreadPool vortexThreadPool) {
+    final Vector<Integer> inputVector = new Vector<>();
+    final int expectedCallbacks = 1000;
+    final CountDownLatch latch = new CountDownLatch(expectedCallbacks);
+    final ConcurrentSet<Integer> outputSet = new ConcurrentSet<>();
+    for (int i = 0; i < expectedCallbacks; i++) {
+      inputVector.add(i);
+    }
+
+    final List<VortexFuture<Integer>> futures = new ArrayList<>();
+    final AddOneFunction addOneFunction = new AddOneFunction();
+
+    for (final int i : inputVector) {
+      futures.add(vortexThreadPool.submit(addOneFunction, i, new EventHandler<Integer>() {
+        @Override
+        public void onNext(final Integer value) {
+          outputSet.add(value - 1);
+          latch.countDown();
+        }
+      }));
+    }
+
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+
+    Assert.assertTrue(outputSet.containsAll(inputVector));
+    Assert.assertTrue(inputVector.containsAll(outputSet));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
index 76b4618..f6d0098 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
@@ -60,4 +60,16 @@ public final class AddOneTest {
     final LauncherStatus status = this.testEnvironment.run(conf);
     Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
   }
+
+  /**
+   * Run the AddOne test with a callback registered such that we check the results in the callback instead of
+   * using {@link org.apache.reef.vortex.api.VortexFuture#get()}.
+   */
+  @Test
+  public void testVortexAddOneCallback() {
+    final Configuration conf =
+        VortexConfHelper.getVortexConf("TEST_Vortex_AddOneCallbackTest", AddOneCallbackTestStart.class, 2, 64, 4, 2000);
+    final LauncherStatus status = this.testEnvironment.run(conf);
+    Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+  }
 }