You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by yu...@apache.org on 2015/12/22 08:06:50 UTC
reef git commit: [REEF-1075] Create Tasklet abstraction to enable
one-to-many mapping of "Future-like" object to TaskletIds
Repository: reef
Updated Branches:
refs/heads/master f10bbf462 -> d6b316925
[REEF-1075] Create Tasklet abstraction to enable one-to-many mapping of "Future-like" object to TaskletIds
This addressed the issue by
* Added VortexFutureDelegate to map a single result/failure to multiple tasklets.
* Moved logic of calling back to Future to VortexMaster.
JIRA:
[REEF-1075](https://issues.apache.org/jira/browse/REEF-1075)
Pull request:
This closes #738
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d6b31692
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d6b31692
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d6b31692
Branch: refs/heads/master
Commit: d6b31692589addfa8dccf7e70892b8eebabc8093
Parents: f10bbf4
Author: Andrew Chung <af...@apache.org>
Authored: Wed Dec 16 11:27:33 2015 -0800
Committer: Yunseong Lee <yu...@apache.org>
Committed: Tue Dec 22 14:54:52 2015 +0800
----------------------------------------------------------------------
.../reef-vortex/src/main/avro/WorkerReport.avsc | 34 ++++++-
.../apache/reef/vortex/api/VortexFuture.java | 46 +++++++--
.../common/TaskletAggregationFailureReport.java | 65 ++++++++++++
.../common/TaskletAggregationResultReport.java | 71 +++++++++++++
.../vortex/common/TaskletFailureReport.java | 28 +++---
.../reef/vortex/common/TaskletReport.java | 4 +-
.../reef/vortex/common/TaskletResultReport.java | 27 +++--
.../reef/vortex/common/VortexAvroUtils.java | 61 ++++++++++-
.../vortex/common/VortexFutureDelegate.java | 61 +++++++++++
.../reef/vortex/driver/DefaultVortexMaster.java | 100 +++++++++++++++----
.../vortex/driver/FirstFitSchedulingPolicy.java | 26 +----
.../vortex/driver/RandomSchedulingPolicy.java | 19 +---
.../reef/vortex/driver/RunningWorkers.java | 60 +----------
.../reef/vortex/driver/SchedulingPolicy.java | 16 +--
.../org/apache/reef/vortex/driver/Tasklet.java | 35 ++-----
.../apache/reef/vortex/driver/VortexDriver.java | 30 +-----
.../apache/reef/vortex/driver/VortexMaster.java | 15 +--
.../reef/vortex/driver/VortexWorkerManager.java | 27 ++---
.../reef/vortex/evaluator/VortexWorker.java | 7 +-
.../vortex/driver/DefaultVortexMasterTest.java | 26 +++--
.../reef/vortex/driver/RunningWorkersTest.java | 14 ++-
.../org/apache/reef/vortex/driver/TestUtil.java | 70 ++++++++++++-
22 files changed, 567 insertions(+), 275 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
index 24110fe..5d7395d 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/WorkerReport.avsc
@@ -22,6 +22,15 @@
"type": "record",
"name": "AvroTaskletResultReport",
"fields": [
+ {"name": "taskletId", "type": "int"},
+ {"name": "serializedOutput", "type": "bytes"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroTaskletAggregationResultReport",
+ "fields": [
{
"name": "taskletIds",
"type":
@@ -46,6 +55,15 @@
"type": "record",
"name": "AvroTaskletFailureReport",
"fields": [
+ {"name": "taskletId", "type": "int"},
+ {"name": "serializedException", "type": "bytes"}
+ ]
+ },
+ {
+ "namespace": "org.apache.reef.vortex.common.avro",
+ "type": "record",
+ "name": "AvroTaskletAggregationFailureReport",
+ "fields": [
{
"name": "taskletIds",
"type":
@@ -68,12 +86,24 @@
{
"type": "enum",
"name": "AvroReportType",
- "symbols": ["TaskletResult", "TaskletCancelled", "TaskletFailure"]
+ "symbols": [
+ "TaskletResult",
+ "TaskletAggregationResult",
+ "TaskletCancelled",
+ "TaskletFailure",
+ "TaskletAggregationFailure"
+ ]
}
},
{
"name": "taskletReport",
- "type": ["AvroTaskletResultReport", "AvroTaskletCancelledReport", "AvroTaskletFailureReport"]
+ "type": [
+ "AvroTaskletResultReport",
+ "AvroTaskletAggregationResultReport",
+ "AvroTaskletCancelledReport",
+ "AvroTaskletFailureReport",
+ "AvroTaskletAggregationFailureReport"
+ ]
}
]
},
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
index 7f4fbb7..5e7f9a2 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexFuture.java
@@ -21,8 +21,11 @@ package org.apache.reef.vortex.api;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.util.Optional;
+import org.apache.reef.vortex.common.VortexFutureDelegate;
import org.apache.reef.vortex.driver.VortexMaster;
+import java.io.Serializable;
+import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
@@ -32,7 +35,8 @@ import java.util.logging.Logger;
* The interface between user code and submitted task.
*/
@Unstable
-public final class VortexFuture<TOutput> implements Future<TOutput> {
+public final class VortexFuture<TOutput extends Serializable>
+ implements Future<TOutput>, VortexFutureDelegate<TOutput> {
private static final Logger LOG = Logger.getLogger(VortexFuture.class.getName());
// userResult starts out as null. If not null => variable is set and tasklet returned.
@@ -173,9 +177,13 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
}
/**
- * Called by VortexMaster to let the user know that the task completed.
+ * Called by VortexMaster to let the user know that the Tasklet completed.
*/
- public void completed(final TOutput result) {
+ @Private
+ @Override
+ public void completed(final int pTaskletId, final TOutput result) {
+ assert taskletId == pTaskletId;
+
this.userResult = Optional.ofNullable(result);
if (callbackHandler != null) {
executor.execute(new Runnable() {
@@ -189,10 +197,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
}
/**
- * Called by VortexMaster to let the user know that the task threw an exception.
+ * VortexMaster should never call this.
+ */
+ @Private
+ @Override
+ public void aggregationCompleted(final List<Integer> taskletIds, final TOutput result) {
+ throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated.");
+ }
+
+ /**
+ * Called by VortexMaster to let the user know that the Tasklet threw an exception.
*/
@Private
- public void threwException(final Exception exception) {
+ @Override
+ public void threwException(final int pTaskletId, final Exception exception) {
+ assert taskletId == pTaskletId;
+
this.userException = exception;
if (callbackHandler != null) {
executor.execute(new Runnable() {
@@ -206,10 +226,22 @@ public final class VortexFuture<TOutput> implements Future<TOutput> {
}
/**
- * Called by VortexMaster to let the user know that the task was cancelled.
+ * VortexMaster should never call this.
*/
@Private
- public void cancelled() {
+ @Override
+ public void aggregationThrewException(final List<Integer> taskletIds, final Exception exception) {
+ throw new RuntimeException("Functions not associated with AggregationFunctions cannot be aggregated");
+ }
+
+ /**
+ * Called by VortexMaster to let the user know that the Tasklet was cancelled.
+ */
+ @Private
+ @Override
+ public void cancelled(final int pTaskletId) {
+ assert taskletId == pTaskletId;
+
this.cancelled.set(true);
if (callbackHandler != null) {
executor.execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
new file mode 100644
index 0000000..e6a4c82
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationFailureReport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a tasklet exception on aggregation.
+ */
+@Unstable
+public final class TaskletAggregationFailureReport implements TaskletReport {
+ private final List<Integer> taskletIds;
+ private final Exception exception;
+
+ /**
+ * @param taskletIds of the failed tasklet(s).
+ * @param exception that caused the tasklet failure.
+ */
+ public TaskletAggregationFailureReport(final List<Integer> taskletIds, final Exception exception) {
+ this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+ this.exception = exception;
+ }
+
+ /**
+ * @return the type of this TaskletReport.
+ */
+ @Override
+ public TaskletReportType getType() {
+ return TaskletReportType.TaskletAggregationFailure;
+ }
+
+ /**
+ * @return the taskletIds that failed on aggregation.
+ */
+ public List<Integer> getTaskletIds() {
+ return taskletIds;
+ }
+
+ /**
+ * @return the exception that caused the tasklet aggregation failure.
+ */
+ public Exception getException() {
+ return exception;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
new file mode 100644
index 0000000..ce4a015
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationResultReport.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Report of a Tasklet aggregation execution result.
+ */
+@Private
+@DriverSide
+@Unstable
+public final class TaskletAggregationResultReport<TOutput extends Serializable> implements TaskletReport {
+ private final List<Integer> taskletIds;
+ private final TOutput result;
+
+ /**
+ * @param taskletIds of the tasklets.
+ * @param result of the tasklet execution.
+ */
+ public TaskletAggregationResultReport(final List<Integer> taskletIds, final TOutput result) {
+ this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+ this.result = result;
+ }
+
+ /**
+ * @return the type of this TaskletReport.
+ */
+ @Override
+ public TaskletReportType getType() {
+ return TaskletReportType.TaskletAggregationResult;
+ }
+
+ /**
+ * @return the TaskletId(s) of this TaskletReport
+ */
+ public List<Integer> getTaskletIds() {
+ return taskletIds;
+ }
+
+ /**
+ * @return the result of the Tasklet aggregation execution.
+ */
+ public TOutput getResult() {
+ return result;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
index 487a3d2..5c0b2de 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletFailureReport.java
@@ -19,25 +19,25 @@
package org.apache.reef.vortex.common;
import org.apache.reef.annotations.Unstable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
/**
- * Report of a tasklet exception.
+ * Report of a Tasklet exception.
*/
@Unstable
+@Private
+@DriverSide
public final class TaskletFailureReport implements TaskletReport {
- private final List<Integer> taskletIds;
+ private final int taskletId;
private final Exception exception;
/**
- * @param taskletIds of the failed tasklet(s).
+ * @param taskletId of the failed Tasklet.
* @param exception that caused the tasklet failure.
*/
- public TaskletFailureReport(final List<Integer> taskletIds, final Exception exception) {
- this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+ public TaskletFailureReport(final int taskletId, final Exception exception) {
+ this.taskletId = taskletId;
this.exception = exception;
}
@@ -50,16 +50,14 @@ public final class TaskletFailureReport implements TaskletReport {
}
/**
- * Returns multiple TaskletIds if an aggregation of Tasklets fail.
- * Returns a single TaskletId if a Tasklet fails.
- * @return the taskletId(s) of this TaskletReport.
+ * @return the taskletId of this TaskletReport.
*/
- public List<Integer> getTaskletIds() {
- return taskletIds;
+ public int getTaskletId() {
+ return taskletId;
}
/**
- * @return the exception that caused the tasklet failure.
+ * @return the exception that caused the Tasklet failure.
*/
public Exception getException() {
return exception;
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
index 7e083eb..6392b23 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletReport.java
@@ -36,8 +36,10 @@ public interface TaskletReport extends Serializable {
*/
enum TaskletReportType {
TaskletResult,
+ TaskletAggregationResult,
TaskletCancelled,
- TaskletFailure
+ TaskletFailure,
+ TaskletAggregationFailure
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
index 08e8d06..8e3bac3 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletResultReport.java
@@ -19,26 +19,27 @@
package org.apache.reef.vortex.common;
import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
/**
* Report of a tasklet execution result.
*/
@Unstable
+@Private
+@DriverSide
public final class TaskletResultReport<TOutput extends Serializable> implements TaskletReport {
- private final List<Integer> taskletIds;
+ private final int taskletId;
private final TOutput result;
/**
- * @param taskletIds of the tasklets.
- * @param result of the tasklet execution.
+ * @param taskletId of the Tasklet.
+ * @param result of the Tasklet execution.
*/
- public TaskletResultReport(final List<Integer> taskletIds, final TOutput result) {
- this.taskletIds = Collections.unmodifiableList(new ArrayList<>(taskletIds));
+ public TaskletResultReport(final int taskletId, final TOutput result) {
+ this.taskletId = taskletId;
this.result = result;
}
@@ -51,16 +52,14 @@ public final class TaskletResultReport<TOutput extends Serializable> implements
}
/**
- * Returns multiple TaskletIds if the result is from an Aggregation.
- * Returns a single TaskletId if the result is from a single Tasklet.
- * @return the TaskletId(s) of this TaskletReport
+ * @return the TaskletId of this TaskletReport
*/
- public List<Integer> getTaskletIds() {
- return taskletIds;
+ public int getTaskletId() {
+ return taskletId;
}
/**
- * @return the result of the tasklet execution.
+ * @return the result of the Tasklet execution.
*/
public TOutput getResult() {
return result;
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index e4f747d..cc3cced 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -22,6 +22,9 @@ import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.common.avro.*;
@@ -35,6 +38,9 @@ import java.util.List;
/**
* Serialize and deserialize Vortex message to/from byte array.
*/
+@Private
+@DriverSide
+@Unstable
public final class VortexAvroUtils {
/**
* Serialize VortexRequest to byte array.
@@ -101,11 +107,26 @@ public final class VortexAvroUtils {
.setReportType(AvroReportType.TaskletResult)
.setTaskletReport(
AvroTaskletResultReport.newBuilder()
- .setTaskletIds(taskletResultReport.getTaskletIds())
+ .setTaskletId(taskletResultReport.getTaskletId())
.setSerializedOutput(ByteBuffer.wrap(serializedOutput))
.build())
.build();
break;
+ case TaskletAggregationResult:
+ final TaskletAggregationResultReport taskletAggregationResultReport =
+ (TaskletAggregationResultReport) taskletReport;
+ // TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
+ final byte[] serializedAggregationOutput =
+ SerializationUtils.serialize(taskletAggregationResultReport.getResult());
+ avroTaskletReport = AvroTaskletReport.newBuilder()
+ .setReportType(AvroReportType.TaskletAggregationResult)
+ .setTaskletReport(
+ AvroTaskletAggregationResultReport.newBuilder()
+ .setTaskletIds(taskletAggregationResultReport.getTaskletIds())
+ .setSerializedOutput(ByteBuffer.wrap(serializedAggregationOutput))
+ .build())
+ .build();
+ break;
case TaskletCancelled:
final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
avroTaskletReport = AvroTaskletReport.newBuilder()
@@ -123,11 +144,25 @@ public final class VortexAvroUtils {
.setReportType(AvroReportType.TaskletFailure)
.setTaskletReport(
AvroTaskletFailureReport.newBuilder()
- .setTaskletIds(taskletFailureReport.getTaskletIds())
+ .setTaskletId(taskletFailureReport.getTaskletId())
.setSerializedException(ByteBuffer.wrap(serializedException))
.build())
.build();
break;
+ case TaskletAggregationFailure:
+ final TaskletAggregationFailureReport taskletAggregationFailureReport =
+ (TaskletAggregationFailureReport) taskletReport;
+ final byte[] serializedAggregationException =
+ SerializationUtils.serialize(taskletAggregationFailureReport.getException());
+ avroTaskletReport = AvroTaskletReport.newBuilder()
+ .setReportType(AvroReportType.TaskletAggregationFailure)
+ .setTaskletReport(
+ AvroTaskletAggregationFailureReport.newBuilder()
+ .setTaskletIds(taskletAggregationFailureReport.getTaskletIds())
+ .setSerializedException(ByteBuffer.wrap(serializedAggregationException))
+ .build())
+ .build();
+ break;
default:
throw new RuntimeException("Undefined message type");
}
@@ -197,7 +232,16 @@ public final class VortexAvroUtils {
// TODO[REEF-1005]: Allow custom codecs for input/output data in Vortex.
final Serializable output =
(Serializable) SerializationUtils.deserialize(taskletResultReport.getSerializedOutput().array());
- taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletIds(), output);
+ taskletReport = new TaskletResultReport<>(taskletResultReport.getTaskletId(), output);
+ break;
+ case TaskletAggregationResult:
+ final AvroTaskletAggregationResultReport taskletAggregationResultReport =
+ (AvroTaskletAggregationResultReport)avroTaskletReport.getTaskletReport();
+ final Serializable aggregationOutput =
+ (Serializable) SerializationUtils.deserialize(
+ taskletAggregationResultReport.getSerializedOutput().array());
+ taskletReport =
+ new TaskletAggregationResultReport<>(taskletAggregationResultReport.getTaskletIds(), aggregationOutput);
break;
case TaskletCancelled:
final AvroTaskletCancelledReport taskletCancelledReport =
@@ -209,7 +253,16 @@ public final class VortexAvroUtils {
(AvroTaskletFailureReport)avroTaskletReport.getTaskletReport();
final Exception exception =
(Exception) SerializationUtils.deserialize(taskletFailureReport.getSerializedException().array());
- taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletIds(), exception);
+ taskletReport = new TaskletFailureReport(taskletFailureReport.getTaskletId(), exception);
+ break;
+ case TaskletAggregationFailure:
+ final AvroTaskletAggregationFailureReport taskletAggregationFailureReport =
+ (AvroTaskletAggregationFailureReport)avroTaskletReport.getTaskletReport();
+ final Exception aggregationException =
+ (Exception) SerializationUtils.deserialize(
+ taskletAggregationFailureReport.getSerializedException().array());
+ taskletReport =
+ new TaskletAggregationFailureReport(taskletAggregationFailureReport.getTaskletIds(), aggregationException);
break;
default:
throw new RuntimeException("Undefined TaskletReport type");
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
new file mode 100644
index 0000000..55f3cf5
--- /dev/null
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexFutureDelegate.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.common;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Exposes functions to be called by the {@link org.apache.reef.vortex.driver.VortexMaster}
+ * to note that a list of Tasklets associated with a Future has completed.
+ */
+@Unstable
+@DriverSide
+@Private
+public interface VortexFutureDelegate<TOutput extends Serializable> {
+
+ /**
+ * A Tasklet associated with the future has completed with a result.
+ */
+ void completed(final int taskletId, final TOutput result);
+
+ /**
+ * The list of aggregated Tasklets associated with the Future that have completed with a result.
+ */
+ void aggregationCompleted(final List<Integer> taskletIds, final TOutput result);
+
+ /**
+ * A Tasklet associated with the Future has thrown an Exception.
+ */
+ void threwException(final int taskletId, final Exception exception);
+
+ /**
+ * The list of Tasklets associated with the Future that have thrown an Exception.
+ */
+ void aggregationThrewException(final List<Integer> taskletIds, final Exception exception);
+
+ /**
+ * A Tasklet associated with the Future has been cancelled.
+ */
+ void cancelled(final int taskletId);
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index cb62049..80c3cb0 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -25,6 +25,7 @@ import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.FutureCallback;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.*;
import javax.inject.Inject;
import java.io.Serializable;
@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
@ThreadSafe
@DriverSide
final class DefaultVortexMaster implements VortexMaster {
+ private final Map<Integer, VortexFutureDelegate> taskletFutureMap = new HashMap<>();
private final AtomicInteger taskletIdCounter = new AtomicInteger();
private final RunningWorkers runningWorkers;
private final PendingTasklets pendingTasklets;
@@ -74,7 +76,9 @@ final class DefaultVortexMaster implements VortexMaster {
}
final Tasklet tasklet = new Tasklet<>(id, function, input, vortexFuture);
+ putDelegate(Collections.singletonList(tasklet), vortexFuture);
this.pendingTasklets.addLast(tasklet);
+
return vortexFuture;
}
@@ -107,37 +111,97 @@ final class DefaultVortexMaster implements VortexMaster {
}
}
- /**
- * Notify task completion to runningWorkers.
- */
@Override
- public void taskletCompleted(final String workerId,
- final int taskletId,
- final Serializable result) {
- runningWorkers.completeTasklet(workerId, taskletId, result);
+ public void workerReported(final String workerId, final WorkerReport workerReport) {
+ for (final TaskletReport taskletReport : workerReport.getTaskletReports()) {
+ switch (taskletReport.getType()) {
+ case TaskletResult:
+ final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
+
+ final int resultTaskletId = taskletResultReport.getTaskletId();
+ final List<Integer> singletonResultTaskletId = Collections.singletonList(resultTaskletId);
+ runningWorkers.doneTasklets(workerId, singletonResultTaskletId);
+ fetchDelegate(singletonResultTaskletId).completed(resultTaskletId, taskletResultReport.getResult());
+
+ break;
+ case TaskletAggregationResult:
+ final TaskletAggregationResultReport taskletAggregationResultReport =
+ (TaskletAggregationResultReport) taskletReport;
+
+ final List<Integer> aggregatedTaskletIds = taskletAggregationResultReport.getTaskletIds();
+ runningWorkers.doneTasklets(workerId, aggregatedTaskletIds);
+ fetchDelegate(aggregatedTaskletIds).aggregationCompleted(
+ aggregatedTaskletIds, taskletAggregationResultReport.getResult());
+
+ break;
+ case TaskletCancelled:
+ final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
+ final List<Integer> cancelledIdToList = Collections.singletonList(taskletCancelledReport.getTaskletId());
+ runningWorkers.doneTasklets(workerId, cancelledIdToList);
+ fetchDelegate(cancelledIdToList).cancelled(taskletCancelledReport.getTaskletId());
+
+ break;
+ case TaskletFailure:
+ final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
+
+ final int failureTaskletId = taskletFailureReport.getTaskletId();
+ final List<Integer> singletonFailedTaskletId = Collections.singletonList(failureTaskletId);
+ runningWorkers.doneTasklets(workerId, singletonFailedTaskletId);
+ fetchDelegate(singletonFailedTaskletId).threwException(failureTaskletId, taskletFailureReport.getException());
+
+ break;
+ case TaskletAggregationFailure:
+ final TaskletAggregationFailureReport taskletAggregationFailureReport =
+ (TaskletAggregationFailureReport) taskletReport;
+
+ final List<Integer> aggregationFailedTaskletIds = taskletAggregationFailureReport.getTaskletIds();
+ runningWorkers.doneTasklets(workerId, aggregationFailedTaskletIds);
+ fetchDelegate(aggregationFailedTaskletIds).aggregationThrewException(aggregationFailedTaskletIds,
+ taskletAggregationFailureReport.getException());
+ break;
+ default:
+ throw new RuntimeException("Unknown Report");
+ }
+ }
}
/**
- * Notify task failure to runningWorkers.
+ * Terminate the job.
*/
@Override
- public void taskletErrored(final String workerId, final int taskletId, final Exception exception) {
- runningWorkers.errorTasklet(workerId, taskletId, exception);
+ public void terminate() {
+ runningWorkers.terminate();
}
/**
- * Notify tasklet cancellation to runningWorkers.
+ * Puts a delegate to associate with a Tasklet.
*/
- @Override
- public void taskletCancelled(final String workerId, final int taskletId) {
- runningWorkers.taskletCancelled(workerId, taskletId);
+ private synchronized void putDelegate(final List<Tasklet> tasklets, final VortexFutureDelegate delegate) {
+ for (final Tasklet tasklet : tasklets) {
+ taskletFutureMap.put(tasklet.getId(), delegate);
+ }
}
/**
- * Terminate the job.
+ * Fetches a delegate that maps to the list of Tasklets.
*/
- @Override
- public void terminate() {
- runningWorkers.terminate();
+ private synchronized VortexFutureDelegate fetchDelegate(final List<Integer> taskletIds) {
+ VortexFutureDelegate delegate = null;
+ for (final int taskletId : taskletIds) {
+ final VortexFutureDelegate currDelegate = taskletFutureMap.remove(taskletId);
+ if (currDelegate == null) {
+ // TODO[JIRA REEF-500]: Consider duplicate tasklets.
+ throw new RuntimeException("Tasklet should only be removed once.");
+ }
+
+ if (delegate == null) {
+ delegate = currDelegate;
+ } else {
+ assert delegate == currDelegate;
+ }
+ }
+
+ return delegate;
}
+
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
index 8b28eab..489751f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/FirstFitSchedulingPolicy.java
@@ -125,33 +125,17 @@ class FirstFitSchedulingPolicy implements SchedulingPolicy {
/**
* @param vortexWorker that the tasklet completed in
- * @param tasklet completed
+ * @param tasklets completed
*/
@Override
- public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+ public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
final String workerId = vortexWorker.getId();
- removeTasklet(workerId);
+ removeTasklet(workerId, tasklets);
}
- /**
- * @param vortexWorker that the tasklet failed in
- * @param tasklet failed
- */
- @Override
- public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
- final String workerId = vortexWorker.getId();
- removeTasklet(workerId);
- }
-
- @Override
- public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
- final String workerId = vortexWorker.getId();
- removeTasklet(workerId);
- }
-
- private void removeTasklet(final String workerId) {
+ private void removeTasklet(final String workerId, final List<Tasklet> tasklets) {
if (idLoadMap.containsKey(workerId)) {
- idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - 1));
+ idLoadMap.put(workerId, Math.max(0, idLoadMap.get(workerId) - tasklets.size()));
}
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
index ffa2fbf..76dc5ca 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RandomSchedulingPolicy.java
@@ -90,24 +90,7 @@ class RandomSchedulingPolicy implements SchedulingPolicy {
* Do nothing.
*/
@Override
- public void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+ public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
// Do nothing
}
-
- /**
- * Do nothing.
- */
- @Override
- public void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
- // Do nothing
- }
-
- /**
- * Do nothing.
- */
- @Override
- public void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
- // Do nothing
- }
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index 7b05c0c..a1fb96f 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -25,7 +25,6 @@ import org.apache.reef.util.Optional;
import javax.inject.Inject;
-import java.io.Serializable;
import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -184,69 +183,18 @@ final class RunningWorkers {
* Parameter: Same arguments can come in multiple times.
* (e.g. preemption message coming before tasklet completion message multiple times)
*/
- void completeTasklet(final String workerId,
- final int taskletId,
- final Serializable result) {
+ void doneTasklets(final String workerId, final List<Integer> taskletIds) {
lock.lock();
try {
if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
final VortexWorkerManager worker = this.runningWorkers.get(workerId);
- final Tasklet tasklet = worker.taskletCompleted(taskletId, result);
- this.schedulingPolicy.taskletCompleted(worker, tasklet);
+ final List<Tasklet> tasklets = worker.taskletsDone(taskletIds);
+ this.schedulingPolicy.taskletsDone(worker, tasklets);
- // Notify (possibly) waiting scheduler
- noWorkerOrResource.signal();
-
- taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Concurrency: Called by multiple threads.
- * Parameter: Same arguments can come in multiple times.
- * (e.g. preemption message coming before tasklet error message multiple times)
- */
- void errorTasklet(final String workerId,
- final int taskletId,
- final Exception exception) {
- lock.lock();
- try {
- if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
- final VortexWorkerManager worker = this.runningWorkers.get(workerId);
- final Tasklet tasklet = worker.taskletThrewException(taskletId, exception);
- this.schedulingPolicy.taskletFailed(worker, tasklet);
-
- // Notify (possibly) waiting scheduler
- noWorkerOrResource.signal();
-
- taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
- }
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Concurrency: Called by multiple threads.
- * Parameter: Same arguments can come in multiple times.
- * (e.g. preemption message coming before tasklet error message multiple times)
- */
- void taskletCancelled(final String workerId,
- final int taskletId) {
- lock.lock();
- try {
- if (!terminated && runningWorkers.containsKey(workerId)) { // Preemption can come before
- final VortexWorkerManager worker = this.runningWorkers.get(workerId);
- final Tasklet tasklet = worker.taskletCancelled(taskletId);
- this.schedulingPolicy.taskletCancelled(worker, tasklet);
+ taskletsToCancel.removeAll(taskletIds); // cleanup to prevent memory leak.
// Notify (possibly) waiting scheduler
noWorkerOrResource.signal();
-
- taskletsToCancel.remove(taskletId); // cleanup to prevent memory leak.
}
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
index a058f6f..bccc0ea 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/SchedulingPolicy.java
@@ -21,6 +21,8 @@ package org.apache.reef.vortex.driver;
import org.apache.reef.tang.annotations.DefaultImplementation;
import org.apache.reef.util.Optional;
+import java.util.List;
+
/**
* For choosing which worker to schedule the tasklet onto.
*/
@@ -49,17 +51,7 @@ interface SchedulingPolicy {
void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
/**
- * Tasklet completed.
- */
- void taskletCompleted(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
-
- /**
- * Tasklet failed.
- */
- void taskletFailed(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
-
- /**
- * Tasklet cancelled.
+ * Tasklets completed.
*/
- void taskletCancelled(final VortexWorkerManager vortexWorker, final Tasklet tasklet);
+ void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
index b0138d0..24db3cb 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/Tasklet.java
@@ -20,7 +20,7 @@ package org.apache.reef.vortex.driver;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.vortex.api.VortexFunction;
-import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.VortexFutureDelegate;
import java.io.Serializable;
@@ -32,16 +32,16 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
private final int taskletId;
private final VortexFunction<TInput, TOutput> userTask;
private final TInput input;
- private final VortexFuture<TOutput> vortexFuture;
+ private final VortexFutureDelegate delegate;
Tasklet(final int taskletId,
final VortexFunction<TInput, TOutput> userTask,
final TInput input,
- final VortexFuture<TOutput> vortexFuture) {
+ final VortexFutureDelegate delegate) {
this.taskletId = taskletId;
this.userTask = userTask;
this.input = input;
- this.vortexFuture = vortexFuture;
+ this.delegate = delegate;
}
/**
@@ -66,31 +66,10 @@ class Tasklet<TInput extends Serializable, TOutput extends Serializable> {
}
/**
- * Called by VortexMaster to let the user know that the task completed.
+ * Called by {@link RunningWorkers} to cancel the Tasklet before launch.
*/
- void completed(final TOutput result) {
- vortexFuture.completed(result);
- }
-
- /**
- * Called by VortexMaster to let the user know that the task threw an exception.
- */
- void threwException(final Exception exception) {
- vortexFuture.threwException(exception);
- }
-
- /**
- * Called by VortexMaster to let the user know that the task has been cancelled.
- */
- void cancelled(){
- vortexFuture.cancelled();
- }
-
- /**
- * For tests.
- */
- boolean isCompleted() {
- return vortexFuture.isDone();
+ void cancelled() {
+ delegate.cancelled(taskletId);
}
/**
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
index 993b32f..f581f99 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexDriver.java
@@ -37,7 +37,6 @@ import org.apache.reef.wake.impl.ThreadPoolStage;
import org.apache.reef.wake.time.event.StartTime;
import javax.inject.Inject;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -153,34 +152,7 @@ final class VortexDriver {
public void onNext(final TaskMessage taskMessage) {
final String workerId = taskMessage.getId();
final WorkerReport workerReport = VortexAvroUtils.toWorkerReport(taskMessage.get());
-
- // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
- assert workerReport.getTaskletReports().size() == 1;
-
- final TaskletReport taskletReport = workerReport.getTaskletReports().get(0);
- switch (taskletReport.getType()) {
- case TaskletResult:
- final TaskletResultReport taskletResultReport = (TaskletResultReport) taskletReport;
-
- // TODO[JIRA REEF-942]: Fix when aggregation is allowed.
- final List<Integer> resultTaskletIds = taskletResultReport.getTaskletIds();
-
- assert resultTaskletIds.size() == 1;
- vortexMaster.taskletCompleted(workerId, resultTaskletIds.get(0),
- taskletResultReport.getResult());
- break;
- case TaskletCancelled:
- final TaskletCancelledReport taskletCancelledReport = (TaskletCancelledReport) taskletReport;
- vortexMaster.taskletCancelled(workerId, taskletCancelledReport.getTaskletId());
- break;
- case TaskletFailure:
- final TaskletFailureReport taskletFailureReport = (TaskletFailureReport) taskletReport;
- vortexMaster.taskletErrored(workerId, taskletFailureReport.getTaskletIds().get(0),
- taskletFailureReport.getException());
- break;
- default:
- throw new RuntimeException("Unknown Report");
- }
+ vortexMaster.workerReported(workerId, workerReport);
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 38bbcc5..becf3f9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -25,6 +25,7 @@ import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.FutureCallback;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.WorkerReport;
import java.io.Serializable;
@@ -62,19 +63,9 @@ public interface VortexMaster {
void workerPreempted(final String id);
/**
- * Call this when a Tasklet is completed.
+ * Call this when a worker has reported back.
*/
- void taskletCompleted(final String workerId, final int taskletId, final Serializable result);
-
- /**
- * Call this when a Tasklet errored.
- */
- void taskletErrored(final String workerId, final int taskletId, final Exception exception);
-
- /**
- * Call this when a Tasklet is cancelled and the cancellation is honored.
- */
- void taskletCancelled(final String workerId, final int taskletId);
+ void workerReported(final String workerId, final WorkerReport workerReport);
/**
* Release all resources and shut down.
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 76d6cec..ffba985 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -25,8 +25,7 @@ import org.apache.reef.vortex.common.TaskletCancellationRequest;
import org.apache.reef.vortex.common.TaskletExecutionRequest;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
+import java.util.*;
/**
* Representation of a VortexWorkerManager in Driver.
@@ -57,25 +56,13 @@ class VortexWorkerManager {
vortexRequestor.send(reefTask, cancellationRequest);
}
- <TOutput extends Serializable> Tasklet taskletCompleted(final Integer taskletId, final TOutput result) {
- final Tasklet<?, TOutput> tasklet = runningTasklets.remove(taskletId);
- assert tasklet != null; // Tasklet should complete/error only once
- tasklet.completed(result);
- return tasklet;
- }
-
- Tasklet taskletThrewException(final Integer taskletId, final Exception exception) {
- final Tasklet tasklet = runningTasklets.remove(taskletId);
- assert tasklet != null; // Tasklet should complete/error only once
- tasklet.threwException(exception);
- return tasklet;
- }
+ List<Tasklet> taskletsDone(final List<Integer> taskletIds) {
+ final List<Tasklet> taskletList = new ArrayList<>();
+ for (final int taskletId : taskletIds) {
+ taskletList.add(runningTasklets.remove(taskletId));
+ }
- Tasklet taskletCancelled(final Integer taskletId) {
- final Tasklet tasklet = runningTasklets.remove(taskletId);
- assert tasklet != null; // Tasklet should finish only once.
- tasklet.cancelled();
- return tasklet;
+ return Collections.unmodifiableList(taskletList);
}
Collection<Tasklet> removed() {
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 26d4287..920f1a9 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -36,7 +36,6 @@ import org.apache.reef.wake.EventHandler;
import javax.inject.Inject;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.logging.Level;
@@ -111,8 +110,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
// Command Executor: Execute the command
final Serializable result = taskletExecutionRequest.execute();
final TaskletReport taskletReport =
- new TaskletResultReport<>(Collections.singletonList(
- taskletExecutionRequest.getTaskletId()), result);
+ new TaskletResultReport<>(taskletExecutionRequest.getTaskletId(), result);
taskletReports.add(taskletReport);
} catch (final InterruptedException ex) {
// Assumes that user's thread follows convention that cancelled Futures
@@ -124,8 +122,7 @@ public final class VortexWorker implements Task, TaskMessageSource {
} catch (Exception e) {
// Command Executor: Tasklet throws an exception
final TaskletReport taskletReport =
- new TaskletFailureReport(Collections.singletonList(
- taskletExecutionRequest.getTaskletId()), e);
+ new TaskletFailureReport(taskletExecutionRequest.getTaskletId(), e);
taskletReports.add(taskletReport);
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
index 6c2162a..325d3d4 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/DefaultVortexMasterTest.java
@@ -22,6 +22,10 @@ import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.FutureCallback;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
+import org.apache.reef.vortex.common.TaskletFailureReport;
+import org.apache.reef.vortex.common.TaskletReport;
+import org.apache.reef.vortex.common.TaskletResultReport;
+import org.apache.reef.vortex.common.WorkerReport;
import org.junit.Test;
import java.util.*;
@@ -71,7 +75,9 @@ public class DefaultVortexMasterTest {
final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
for (final int taskletId : taskletIds) {
- vortexMaster.taskletCompleted(vortexWorkerManager1.getId(), taskletId, null);
+ final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+ vortexMaster.workerReported(
+ vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
@@ -106,9 +112,12 @@ public class DefaultVortexMasterTest {
final ArrayList<Integer> taskletIds2 = launchTasklets(runningWorkers, pendingTasklets, 1);
assertEquals("Both lists need to contain the same single tasklet id", taskletIds1, taskletIds2);
+
// Completed?
for (final int taskletId : taskletIds2) {
- vortexMaster.taskletCompleted(vortexWorkerManager2.getId(), taskletId, null);
+ final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+ vortexMaster.workerReported(
+ vortexWorkerManager2.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
}
@@ -157,7 +166,9 @@ public class DefaultVortexMasterTest {
for (final int taskletId : taskletIds2) {
final String workerId = runningWorkers.getWhereTaskletWasScheduledTo(taskletId);
assertNotNull("The tasklet must have been scheduled", workerId);
- vortexMaster.taskletCompleted(workerId, taskletId, null);
+ final TaskletReport taskletReport = new TaskletResultReport<>(taskletId, null);
+ vortexMaster.workerReported(
+ workerId, new WorkerReport(Collections.singletonList(taskletReport)));
}
for (final VortexFuture vortexFuture : vortexFutures) {
assertTrue("The VortexFuture should be done", vortexFuture.isDone());
@@ -196,8 +207,11 @@ public class DefaultVortexMasterTest {
final VortexFuture future = vortexMaster.enqueueTasklet(vortexFunction, null, Optional.of(testCallbackHandler));
final ArrayList<Integer> taskletIds = launchTasklets(runningWorkers, pendingTasklets, 1);
+
for (final int taskletId : taskletIds) {
- vortexMaster.taskletErrored(vortexWorkerManager1.getId(), taskletId, new RuntimeException("Test exception"));
+ final TaskletReport taskletReport = new TaskletFailureReport(taskletId, new RuntimeException("Test exception."));
+ vortexMaster.workerReported(
+ vortexWorkerManager1.getId(), new WorkerReport(Collections.singletonList(taskletReport)));
}
assertTrue("The VortexFuture should be done", future.isDone());
@@ -246,9 +260,9 @@ public class DefaultVortexMasterTest {
private VortexFuture createTaskletCancellationFuture(final RunningWorkers runningWorkers,
final PendingTasklets pendingTasklets) {
final VortexFunction vortexFunction = testUtil.newInfiniteLoopFunction();
- final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker();
-
final DefaultVortexMaster vortexMaster = new DefaultVortexMaster(runningWorkers, pendingTasklets, 5);
+ final VortexWorkerManager vortexWorkerManager1 = testUtil.newWorker(vortexMaster);
+
// Allocate worker & tasklet and schedule
vortexMaster.workerAllocated(vortexWorkerManager1);
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
index 7cd0847..76416cb 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/RunningWorkersTest.java
@@ -20,7 +20,9 @@ package org.apache.reef.vortex.driver;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -30,8 +32,9 @@ import static org.junit.Assert.assertTrue;
* Test Possible Race Conditions.
*/
public class RunningWorkersTest {
- private final RunningWorkers runningWorkers = new RunningWorkers(new RandomSchedulingPolicy());
private final TestUtil testUtil = new TestUtil();
+ private final TestUtil.TestSchedulingPolicy schedulingPolicy = testUtil.newSchedulingPolicy();
+ private final RunningWorkers runningWorkers = new RunningWorkers(schedulingPolicy);
/**
* Test executor preemption -> executor allocation.
@@ -58,7 +61,12 @@ public class RunningWorkersTest {
final Collection<Tasklet> tasklets = runningWorkers.removeWorker(vortexWorkerManager.getId()).get();
assertEquals("Only 1 Tasklet must have been running", 1, tasklets.size());
assertTrue("This Tasklet must have been running", tasklets.contains(tasklet));
- runningWorkers.completeTasklet(vortexWorkerManager.getId(), tasklet.getId(), null);
- assertFalse("Tasklet must not have been completed", tasklet.isCompleted());
+ final List<Integer> taskletIds = new ArrayList<>();
+ for (final Tasklet taskletIter : tasklets) {
+ taskletIds.add(taskletIter.getId());
+ }
+
+ runningWorkers.doneTasklets(vortexWorkerManager.getId(), taskletIds);
+ assertFalse("Tasklet must not have been completed", schedulingPolicy.taskletIsDone(tasklet.getId()));
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/d6b31692/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
index 8704b0b..ab93a08 100644
--- a/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
+++ b/lang/java/reef-applications/reef-vortex/src/test/java/org/apache/reef/vortex/driver/TestUtil.java
@@ -19,14 +19,18 @@
package org.apache.reef.vortex.driver;
import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.util.Optional;
import org.apache.reef.vortex.api.VortexFunction;
import org.apache.reef.vortex.api.VortexFuture;
-import org.apache.reef.vortex.common.TaskletCancellationRequest;
-import org.apache.reef.vortex.common.VortexRequest;
+import org.apache.reef.vortex.common.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,9 +50,16 @@ public final class TestUtil {
private final VortexMaster vortexMaster = mock(VortexMaster.class);
/**
- * @return a new mocked worker.
+ * @return a new mocked worker, with a mocked {@link VortexMaster}.
*/
public VortexWorkerManager newWorker() {
+ return newWorker(vortexMaster);
+ }
+
+ /**
+ * @return a new mocked worker, with the {@link VortexMaster} passed in.
+ */
+ public VortexWorkerManager newWorker(final VortexMaster master) {
final RunningTask reefTask = mock(RunningTask.class);
when(reefTask.getId()).thenReturn("worker" + String.valueOf(workerId.getAndIncrement()));
final VortexRequestor vortexRequestor = mock(VortexRequestor.class);
@@ -58,7 +69,8 @@ public final class TestUtil {
public Object answer(final InvocationOnMock invocation) throws Throwable {
final VortexRequest request = (VortexRequest)invocation.getArguments()[1];
if (request instanceof TaskletCancellationRequest) {
- workerManager.taskletCancelled(request.getTaskletId());
+ final TaskletReport cancelReport = new TaskletCancelledReport(request.getTaskletId());
+ master.workerReported(workerManager.getId(), new WorkerReport(Collections.singleton(cancelReport)));
}
return null;
@@ -89,6 +101,13 @@ public final class TestUtil {
}
/**
+ * @return a queryable {@link org.apache.reef.vortex.driver.TestUtil.TestSchedulingPolicy}
+ */
+ public TestSchedulingPolicy newSchedulingPolicy() {
+ return new TestSchedulingPolicy();
+ }
+
+ /**
* @return a new dummy function.
*/
public VortexFunction newInfiniteLoopFunction() {
@@ -116,4 +135,47 @@ public final class TestUtil {
}
};
}
+
+ static final class TestSchedulingPolicy implements SchedulingPolicy {
+ private final SchedulingPolicy policy = new RandomSchedulingPolicy();
+ private final Set<Integer> doneTasklets = new HashSet<>();
+
+ private TestSchedulingPolicy() {
+ }
+
+ @Override
+ public Optional<String> trySchedule(final Tasklet tasklet) {
+ return policy.trySchedule(tasklet);
+ }
+
+ @Override
+ public void workerAdded(final VortexWorkerManager vortexWorker) {
+ policy.workerAdded(vortexWorker);
+ }
+
+ @Override
+ public void workerRemoved(final VortexWorkerManager vortexWorker) {
+ policy.workerRemoved(vortexWorker);
+ }
+
+ @Override
+ public void taskletLaunched(final VortexWorkerManager vortexWorker, final Tasklet tasklet) {
+ policy.taskletLaunched(vortexWorker, tasklet);
+ }
+
+ @Override
+ public void taskletsDone(final VortexWorkerManager vortexWorker, final List<Tasklet> tasklets) {
+ policy.taskletsDone(vortexWorker, tasklets);
+ for (final Tasklet t : tasklets) {
+ doneTasklets.add(t.getId());
+ }
+ }
+
+ /**
+ * @return true if Tasklet with taskletId is done, false otherwise.
+ */
+ public boolean taskletIsDone(final int taskletId) {
+ return doneTasklets.contains(taskletId);
+ }
+ }
}