You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by yu...@apache.org on 2015/11/22 07:42:39 UTC
incubator-reef git commit: [REEF-505] Callback features for
VortexFuture
Repository: incubator-reef
Updated Branches:
refs/heads/master 98dce5735 -> 8c934243f
[REEF-505] Callback features for VortexFuture
This addressed the issue by
* Implementing callbacks on successful VortexTasklet completion.
* Adding unit tests that consider result verification via the callback.
JIRA:
[REEF-505](https://issues.apache.org/jira/browse/REEF-505)
Pull Request:
This closes #665
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/8c934243
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/8c934243
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/8c934243
Branch: refs/heads/master
Commit: 8c934243f605c7b19e203423e5d3e0a397905f31
Parents: 98dce57
Author: Andrew Chung <af...@gmail.com>
Authored: Thu Nov 19 23:06:25 2015 -0800
Committer: Yunseong Lee <yu...@apache.org>
Committed: Sun Nov 22 14:39:53 2015 +0800
----------------------------------------------------------------------
.../apache/reef/vortex/api/VortexFuture.java | 21 +++++-
.../reef/vortex/api/VortexThreadPool.java | 18 ++++-
.../reef/vortex/driver/DefaultVortexMaster.java | 12 +++-
.../apache/reef/vortex/driver/VortexMaster.java | 7 +-
.../vortex/driver/DefaultVortexMasterTest.java | 32 +++++++--
.../org/apache/reef/vortex/driver/TestUtil.java | 12 ++++
.../vortex/addone/AddOneCallbackTestStart.java | 76 ++++++++++++++++++++
.../applications/vortex/addone/AddOneTest.java | 12 ++++
8 files changed, 177 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index e855279..26ff509 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -19,18 +19,34 @@
package org.apache.reef.vortex.api;
import org.apache.reef.annotations.Unstable;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
import java.util.concurrent.*;
/**
* The interface between user code and submitted task.
- * TODO[REEF-505]: Callback features for VortexFuture.
*/
@Unstable
public final class VortexFuture<TOutput> implements Future<TOutput> {
private TOutput userResult;
private Exception userException;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
+ private final ThreadPoolStage<TOutput> stage;
+
+ /**
+ * Creates a {@link VortexFuture}.
+ */
+ public VortexFuture() {
+ stage = null;
+ }
+
+ /**
+ * Creates a {@link VortexFuture} with a callback.
+ */
+ public VortexFuture(final EventHandler<TOutput> callbackHandler) {
+ stage = new ThreadPoolStage<>(callbackHandler, 1);
+ }
/**
* TODO[REEF-502]: Support Vortex Tasklet(s) cancellation by user.
@@ -93,6 +109,9 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
*/
public void completed(final TOutput result) {
this.userResult = result;
+ if (stage != null) {
+ stage.onNext(userResult);
+ }
this.countDownLatch.countDown();
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index 25c5c90..221f852 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -19,7 +19,9 @@
package org.apache.reef.vortex.api;
import org.apache.reef.annotations.Unstable;
+import org.apache.reef.util.Optional;
import org.apache.reef.vortex.driver.VortexMaster;
+import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.io.Serializable;
@@ -45,6 +47,20 @@ public final class VortexThreadPool {
*/
public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
submit(final VortexFunction<TInput, TOutput> function, final TInput input) {
- return vortexMaster.enqueueTasklet(function, input);
+ return vortexMaster.enqueueTasklet(function, input, Optional.<EventHandler<TOutput>>empty());
+ }
+
+ /**
+ * @param function to run on Vortex
+ * @param input of the function
+ * @param callback of the function
+ * @param <TInput> input type
+ * @param <TOutput> output type
+ * @return VortexFuture for tracking execution progress
+ */
+ public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
+ submit(final VortexFunction<TInput, TOutput> function, final TInput input,
+ final EventHandler<TOutput> callback) {
+ return vortexMaster.enqueueTasklet(function, input, Optional.of(callback));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 206f7e3..a2b6161 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.io.Serializable;
@@ -55,9 +56,16 @@ final class DefaultVortexMaster implements VortexMaster {
*/
@Override
public <TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
- enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input) {
+ enqueueTasklet(final VortexFunction<TInput, TOutput> function, final TInput input,
+ final Optional<EventHandler<TOutput>> callback) {
// TODO[REEF-500]: Simple duplicate Vortex Tasklet launch.
- final VortexFuture<TOutput> vortexFuture = new VortexFuture<>();
+ final VortexFuture<TOutput> vortexFuture;
+ if (callback.isPresent()) {
+ vortexFuture = new VortexFuture<>(callback.get());
+ } else {
+ vortexFuture = new VortexFuture<>();
+ }
+
this.pendingTasklets.addLast(new Tasklet<>(taskletIdCounter.getAndIncrement(), function, input, vortexFuture));
return vortexFuture;
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index c9da30d..c7c3e79 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -21,8 +21,10 @@ package org.apache.reef.vortex.driver;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
import java.io.Serializable;
@@ -35,10 +37,11 @@ import java.io.Serializable;
@DefaultImplementation(DefaultVortexMaster.class)
public interface VortexMaster {
/**
- * Submit a new Tasklet to be run sometime in the future.
+ * Submit a new Tasklet to be run sometime in the future, with an optional callback function on the result.
*/
<TInput extends Serializable, TOutput extends Serializable> VortexFuture<TOutput>
- enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input);
+ enqueueTasklet(final VortexFunction<TInput, TOutput> vortexFunction, final TInput input,
+ final Optional<EventHandler<TOutput>> callback);
/**
* Call this when a new worker is up and running.
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index c5c0b30..ff3ae92 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -18,13 +18,15 @@
*/
package org.apache.reef.vortex.driver;
+import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.wake.EventHandler;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.*;
@@ -39,20 +41,34 @@ public class DefaultVortexMasterTest {
*/
@Test(timeout = 10000)
public void testSingleTaskletNoFailure() throws Exception {
- final VortexFunction vortexFunction = testUtil.newFunction();
+ final VortexFunction vortexFunction = testUtil.newIntegerFunction();
final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
final PendingTasklets pendingTasklets = new PendingTasklets();
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets);
+ final AtomicBoolean callbackReceived = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+
vortexMaster.workerAllocated(vortexWorkerManager1);
- final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null);
+
+ final EventHandler<Integer> testCallbackHandler = new EventHandler<Integer>() {
+ @Override
+ public void onNext(final Integer value) {
+ callbackReceived.set(true);
+ latch.countDown();
+ }};
+
+ final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
+
final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
for (final int taskletId : taskletIds) {
vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null);
}
assertTrue("The VortexFuture should be done", future.isDone());
+ latch.await();
+ assertTrue("Callback should have been received", callbackReceived.get());
}
/**
@@ -69,7 +85,8 @@ public class DefaultVortexMasterTest {
// Allocate worker & tasklet and schedule
vortexMaster.workerAllocated(vortexWorkerManager1);
- final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null);
+ final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null,
+ Optional.<EventHandler<Integer>>empty());
final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, 1);
// Preemption!
@@ -111,7 +128,8 @@ public class DefaultVortexMasterTest {
// Schedule tasklets
final int numOfTasklets = 100;
for (int i = 0; i < numOfTasklets; i++) {
- vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null));
+ vortexFutures.add(vortexMaster.enqueueTasklet(testUtil.newFunction(), null,
+ Optional.<EventHandler<Integer>>empty()));
}
final ArrayList<Integer> taskletIds1 = launchTasklets(runningWorkers, pendingTasklets, numOfTasklets);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 5a7da97..80ee597 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -63,4 +63,16 @@ public class TestUtil {
}
};
}
+
+ /**
+ * @return a dummy integer-integer function.
+ */
+ public VortexFunction<Integer, Integer> newIntegerFunction() {
+ return new VortexFunction<Integer, Integer>() {
+ @Override
+ public Integer call(final Integer integer) throws Exception {
+ return 1;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
new file mode 100644
index 0000000..3ab4f5f
--- /dev/null
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneCallbackTestStart.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.tests.applications.vortex.addone;
+
+import io.netty.util.internal.ConcurrentSet;
+import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.api.VortexStart;
+import org.apache.reef.vortex.api.VortexThreadPool;
+import org.apache.reef.wake.EventHandler;
+import org.junit.Assert;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Test correctness of a simple vector calculation on Vortex, checking results with callbacks.
+ */
+public final class AddOneCallbackTestStart implements VortexStart {
+ @Inject
+ private AddOneCallbackTestStart() {
+ }
+
+ /**
+ * Test correctness of a simple vector calculation on Vortex, checking results with callbacks.
+ */
+ @Override
+ public void start(final VortexThreadPool vortexThreadPool) {
+ final Vector<Integer> inputVector = new Vector<>();
+ final int expectedCallbacks = 1000;
+ final CountDownLatch latch = new CountDownLatch(expectedCallbacks);
+ final ConcurrentSet<Integer> outputSet = new ConcurrentSet<>();
+ for (int i = 0; i < expectedCallbacks; i++) {
+ inputVector.add(i);
+ }
+
+ final List<VortexFuture<Integer>> futures = new ArrayList<>();
+ final AddOneFunction addOneFunction = new AddOneFunction();
+
+ for (final int i : inputVector) {
+ futures.add(vortexThreadPool.submit(addOneFunction, i, new EventHandler<Integer>() {
+ @Override
+ public void onNext(final Integer value) {
+ outputSet.add(value - 1);
+ latch.countDown();
+ }
+ }));
+ }
+
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+
+ Assert.assertTrue(outputSet.containsAll(inputVector));
+ Assert.assertTrue(inputVector.containsAll(outputSet));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/8c934243/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
index 76b4618..f6d0098 100644
--- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
+++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/addone/AddOneTest.java
@@ -60,4 +60,16 @@ public final class AddOneTest {
final LauncherStatus status = this.testEnvironment.run(conf);
Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
}
+
+ /**
+ * Run the AddOne test with a callback registered such that we check the results in the callback instead of
+ * using {@link org.apache.reef.vortex.api.VortexFuture#get()}.
+ */
+ @Test
+ public void testVortexAddOneCallback() {
+ final Configuration conf =
+ VortexConfHelper.getVortexConf("TEST_Vortex_AddOneCallbackTest", AddOneCallbackTestStart.class, 2, 64, 4, 2000);
+ final LauncherStatus status = this.testEnvironment.run(conf);
+ Assert.assertTrue("Job state after execution: " + status, status.isSuccess());
+ }
}