You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/01 05:18:02 UTC
[incubator-nemo] branch master updated: [NEMO-79] Clean up the
legacy Task (#24)
This is an automated email from the ASF dual-hosted git repository.
jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 9941c1c [NEMO-79] Clean up the legacy Task (#24)
9941c1c is described below
commit 9941c1c662603c94769224ee46bcb0830be6ac0f
Author: John Yang <jo...@gmail.com>
AuthorDate: Fri Jun 1 14:17:51 2018 +0900
[NEMO-79] Clean up the legacy Task (#24)
JIRA: NEMO-79: Clean up the legacy Task
Major changes:
* Removes the 'old' Task interface and its implementations to make the 'new' Task (formerly TaskGroup) become the only Task in the codebase
* Specifically, changes the DAG in the 'new' Task from DAG<Task, RuntimeEdge> to DAG<IRVertex, RuntimeEdge>
* Adds SourceVertex#clearInternalStates to clear the huge list of input splits held by BeamBoundedSourceVertex before sending the vertex to remote executors. Between clearing states of an existing vertex and creating a new vertex, I've chosen the former approach to ensure consistent use of the same IRVertex object across the compiler, the master, and the executors.
Minor changes to note:
* Make BeamBoundedSourceVertex remember sourceDescription for visualization purposes
* Changes related method and variable names appropriately
* Changes comments appropriately
Tests for the changes:
N/A (no new feature was added)
Other comments:
N/A
resolves NEMO-79
---
bin/json2dot.py | 10 +-
.../edu/snu/nemo/common/ir/vertex/IRVertex.java | 3 +-
...SourceVertex.java => InMemorySourceVertex.java} | 29 +-
.../snu/nemo/common/ir/vertex/SourceVertex.java | 9 +
.../nemo/compiler/backend/nemo/NemoBackend.java | 2 +-
.../beam/source/BeamBoundedSourceVertex.java | 11 +-
.../compiler/frontend/spark/core/java/JavaRDD.java | 2 +-
.../source/SparkDatasetBoundedSourceVertex.java | 7 +-
.../source/SparkTextFileBoundedSourceVertex.java | 7 +-
.../optimizer/examples/EmptyComponents.java | 6 +-
.../nemo/runtime/common/RuntimeIdGenerator.java | 49 +--
.../pass/runtime/DataSkewRuntimePass.java | 2 +-
.../common/plan/physical/BoundedSourceTask.java | 51 ---
.../{ScheduledTask.java => ExecutableTask.java} | 59 ++--
.../plan/physical/MetricCollectionBarrierTask.java | 31 --
.../runtime/common/plan/physical/OperatorTask.java | 45 ---
.../runtime/common/plan/physical/PhysicalPlan.java | 25 +-
.../plan/physical/PhysicalPlanGenerator.java | 85 +++--
.../common/plan/physical/PhysicalStage.java | 47 +--
.../nemo/runtime/common/plan/physical/Task.java | 52 ---
.../common/plan/physical/UnboundedSourceTask.java | 31 --
.../snu/nemo/runtime/common/state/TaskState.java | 3 +-
runtime/common/src/main/proto/ControlMessage.proto | 2 +-
.../edu/snu/nemo/runtime/executor/Executor.java | 35 +-
.../snu/nemo/runtime/executor/TaskExecutor.java | 362 ++++++++++-----------
.../nemo/runtime/executor/TaskStateManager.java | 86 +++--
.../runtime/executor/data/BlockManagerWorker.java | 6 +-
.../executor/datatransfer/DataTransferFactory.java | 7 +-
...skDataHandler.java => IRVertexDataHandler.java} | 52 +--
.../snu/nemo/runtime/master/JobStateManager.java | 5 +-
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 4 +-
.../master/resource/ExecutorRepresenter.java | 14 +-
.../master/scheduler/BatchSingleJobScheduler.java | 46 +--
.../scheduler/CompositeSchedulingPolicy.java | 6 +-
.../ContainerTypeAwareSchedulingPolicy.java | 10 +-
.../master/scheduler/FreeSlotSchedulingPolicy.java | 6 +-
.../master/scheduler/PendingTaskCollection.java | 10 +-
.../scheduler/RoundRobinSchedulingPolicy.java | 6 +-
.../nemo/runtime/master/scheduler/Scheduler.java | 3 -
.../runtime/master/scheduler/SchedulerRunner.java | 6 +-
.../runtime/master/scheduler/SchedulingPolicy.java | 4 +-
.../master/scheduler/SingleJobTaskCollection.java | 26 +-
.../SourceLocationAwareSchedulingPolicy.java | 8 +-
.../ContainerTypeAwareSchedulingPolicyTest.java | 18 +-
.../scheduler/FreeSlotSchedulingPolicyTest.java | 10 +-
.../scheduler/RoundRobinSchedulingPolicyTest.java | 8 +-
.../scheduler/SingleTaskQueueTest.java | 26 +-
.../SourceLocationAwareSchedulingPolicyTest.java | 36 +-
.../runtime/plangenerator/TestPlanGenerator.java | 2 +-
.../snu/nemo/tests/client/ClientEndpointTest.java | 2 +-
.../tests/runtime/executor/TaskExecutorTest.java | 108 +++---
.../runtime/executor/data/BlockStoreTest.java | 60 ++--
.../executor/datatransfer/DataTransferTest.java | 4 +-
.../tests/runtime/master/JobStateManagerTest.java | 4 +-
54 files changed, 642 insertions(+), 906 deletions(-)
diff --git a/bin/json2dot.py b/bin/json2dot.py
index 12f0d48..e4c77bf 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -64,8 +64,8 @@ class PhysicalStageState:
self.id = data['id']
self.state = data['state']
self.tasks = {}
- for task in data['tasks']:
- self.tasks[task['id']] = TaskState(task)
+ for irVertex in data['tasks']:
+ self.tasks[irVertex['id']] = TaskState(irVertex)
@classmethod
def empty(cls):
return cls({'id': None, 'state': None, 'tasks': []})
@@ -252,7 +252,7 @@ class LoopVertex:
class PhysicalStage:
def __init__(self, id, properties, state):
self.id = id
- self.task = DAG(properties['taskDag'], JobState.empty())
+ self.irVertex = DAG(properties['irDag'], JobState.empty())
self.idx = getIdx()
self.state = state
@property
@@ -264,12 +264,12 @@ class PhysicalStage:
dot = 'subgraph cluster_{} {{'.format(self.idx)
dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
- dot += self.task.dot
+ dot += self.irVertex.dot
dot += '}'
return dot
@property
def oneVertex(self):
- return next(iter(self.task.vertices.values())).oneVertex
+ return next(iter(self.irVertex.vertices.values())).oneVertex
@property
def logicalEnd(self):
return 'cluster_{}'.format(self.idx)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index ec2e469..df783b4 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -21,7 +21,8 @@ import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
/**
- * The top-most wrapper for a user operation in the IR.
+ * The basic unit of operation in a dataflow program, as well as the most important data structure in Nemo.
+ * An IRVertex is created and modified in the compiler, and executed in the runtime.
*/
public abstract class IRVertex extends Vertex {
private final ExecutionPropertyMap executionProperties;
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
similarity index 73%
rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
index b64d61b..43fbadc 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InMemorySourceVertex.java
@@ -22,23 +22,23 @@ import java.util.Iterator;
import java.util.List;
/**
- * Source vertex with initial data as object.
- * @param <T> type of initial data.
+ * Source vertex with the data in memory.
+ * @param <T> type of data.
*/
-public final class InitializedSourceVertex<T> extends SourceVertex<T> {
- private final Iterable<T> initializedSourceData;
+public final class InMemorySourceVertex<T> extends SourceVertex<T> {
+ private Iterable<T> initializedSourceData;
/**
* Constructor.
* @param initializedSourceData the initial data object.
*/
- public InitializedSourceVertex(final Iterable<T> initializedSourceData) {
+ public InMemorySourceVertex(final Iterable<T> initializedSourceData) {
this.initializedSourceData = initializedSourceData;
}
@Override
- public InitializedSourceVertex<T> getClone() {
- final InitializedSourceVertex<T> that = new InitializedSourceVertex<>(this.initializedSourceData);
+ public InMemorySourceVertex<T> getClone() {
+ final InMemorySourceVertex<T> that = new InMemorySourceVertex<>(this.initializedSourceData);
this.copyExecutionPropertiesTo(that);
return that;
}
@@ -61,23 +61,28 @@ public final class InitializedSourceVertex<T> extends SourceVertex<T> {
}
}
- readables.add(new InitializedSourceReadable<>(dataForReader));
+ readables.add(new InMemorySourceReadable<>(dataForReader));
}
return readables;
}
+ @Override
+ public void clearInternalStates() {
+ initializedSourceData = null;
+ }
+
/**
- * Readable for initialized source vertex. It simply returns the initialized data.
- * @param <T> type of the initial data.
+ * Simply returns the in-memory data.
+ * @param <T> type of the data.
*/
- private static final class InitializedSourceReadable<T> implements Readable<T> {
+ private static final class InMemorySourceReadable<T> implements Readable<T> {
private final Iterable<T> initializedSourceData;
/**
* Constructor.
* @param initializedSourceData the source data.
*/
- private InitializedSourceReadable(final Iterable<T> initializedSourceData) {
+ private InMemorySourceReadable(final Iterable<T> initializedSourceData) {
this.initializedSourceData = initializedSourceData;
}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
index 1597462..d319b72 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/SourceVertex.java
@@ -34,4 +34,13 @@ public abstract class SourceVertex<O> extends IRVertex {
* @throws Exception if fail to get.
*/
public abstract List<Readable<O>> getReadables(int desiredNumOfSplits) throws Exception;
+
+ /**
+ * Clears internal states, must be called after getReadables().
+ * Concretely, this clears the huge list of input splits held by objects like BeamBoundedSourceVertex before
+ * sending the vertex to remote executors.
+ * Between clearing states of an existing vertex, and creating a new vertex, we've chosen the former approach
+ * to ensure consistent use of the same IRVertex object across the compiler, the master, and the executors.
+ */
+ public abstract void clearInternalStates();
}
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 027378c..4ef453c 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -59,7 +59,7 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
final PhysicalPlanGenerator physicalPlanGenerator) {
final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
- physicalStageDAG, physicalPlanGenerator.getTaskIRVertexMap());
+ physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
return physicalPlan;
}
}
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f03bc57..1143a1a 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -35,7 +35,8 @@ import org.slf4j.LoggerFactory;
*/
public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
- private final BoundedSource<O> source;
+ private BoundedSource<O> source;
+ private final String sourceDescription;
/**
* Constructor of BeamBoundedSourceVertex.
@@ -43,6 +44,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
*/
public BeamBoundedSourceVertex(final BoundedSource<O> source) {
this.source = source;
+ this.sourceDescription = source.toString();
}
@Override
@@ -63,12 +65,17 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
}
@Override
+ public void clearInternalStates() {
+ source = null;
+ }
+
+ @Override
public String propertiesToJSON() {
final StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append(irVertexPropertiesToString());
sb.append(", \"source\": \"");
- sb.append(source);
+ sb.append(sourceDescription);
sb.append("\"}");
return sb.toString();
}
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
index a8d024c..a3e3fd6 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/core/java/JavaRDD.java
@@ -71,7 +71,7 @@ public final class JavaRDD<T> extends org.apache.spark.api.java.JavaRDD<T> {
final int parallelism) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
- final IRVertex initializedSourceVertex = new InitializedSourceVertex<>(initialData);
+ final IRVertex initializedSourceVertex = new InMemorySourceVertex<>(initialData);
initializedSourceVertex.setProperty(ParallelismProperty.of(parallelism));
builder.addVertex(initializedSourceVertex);
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
index 775faf4..6e83b7f 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkDatasetBoundedSourceVertex.java
@@ -30,7 +30,7 @@ import java.util.*;
* @param <T> type of data to read.
*/
public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
- private final List<Readable<T>> readables;
+ private List<Readable<T>> readables;
/**
* Constructor.
@@ -72,6 +72,11 @@ public final class SparkDatasetBoundedSourceVertex<T> extends SourceVertex<T> {
return readables;
}
+ @Override
+ public void clearInternalStates() {
+ readables = null;
+ }
+
/**
* A Readable wrapper for Spark Dataset.
*/
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
index a5e7833..cac2674 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkTextFileBoundedSourceVertex.java
@@ -27,7 +27,7 @@ import java.util.*;
* Bounded source vertex for Spark text file.
*/
public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String> {
- private final List<Readable<String>> readables;
+ private List<Readable<String>> readables;
/**
* Constructor.
@@ -72,6 +72,11 @@ public final class SparkTextFileBoundedSourceVertex extends SourceVertex<String>
return readables;
}
+ @Override
+ public void clearInternalStates() {
+ readables = null;
+ }
+
/**
* A Readable wrapper for Spark text file.
*/
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 257b757..34c66fc 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -72,7 +72,7 @@ public class EmptyComponents {
* @param <T> type of the data.
*/
public static final class EmptySourceVertex<T> extends SourceVertex<T> {
- private final String name;
+ private String name;
/**
* Constructor.
@@ -97,6 +97,10 @@ public class EmptyComponents {
}
@Override
+ public void clearInternalStates() {
+ }
+
+ @Override
public EmptySourceVertex<T> getClone() {
return new EmptySourceVertex<>(this.name);
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index dd19b27..b53ff34 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -29,7 +29,6 @@ public final class RuntimeIdGenerator {
private static final String BLOCK_PREFIX = "Block-";
private static final String BLOCK_ID_SPLITTER = "_";
private static final String TASK_INFIX = "-Task-";
- private static final String PHYSICAL_TASK_ID_SPLITTER = "_";
/**
* Private constructor which will not be used.
@@ -37,6 +36,9 @@ public final class RuntimeIdGenerator {
private RuntimeIdGenerator() {
}
+
+ //////////////////////////////////////////////////////////////// Generate IDs
+
/**
* Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
*
@@ -76,36 +78,13 @@ public final class RuntimeIdGenerator {
}
/**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
- *
- * @param irVertexId the ID of the IR vertex.
- * @return the generated ID
- */
- public static String generateLogicalTaskId(final String irVertexId) {
- return "Task-" + irVertexId;
- }
-
- /**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
- *
- * @param index the index of the physical task.
- * @param logicalTaskId the logical ID of the task.
- * @return the generated ID
- */
- public static String generatePhysicalTaskId(final int index,
- final String logicalTaskId) {
- return logicalTaskId + PHYSICAL_TASK_ID_SPLITTER + index;
- }
-
- /**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+ * Generates the ID for a task.
*
* @param index the index of this task.
* @param stageId the ID of the stage.
* @return the generated ID
*/
- public static String generateTaskId(final int index,
- final String stageId) {
+ public static String generateTaskId(final int index, final String stageId) {
return stageId + TASK_INFIX + index;
}
@@ -122,12 +101,12 @@ public final class RuntimeIdGenerator {
* Generates the ID for a block, whose data is the output of a task.
*
* @param runtimeEdgeId of the block
- * @param taskIndex of the block
+ * @param producerTaskIndex of the block
* @return the generated ID
*/
public static String generateBlockId(final String runtimeEdgeId,
- final int taskIndex) {
- return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + taskIndex;
+ final int producerTaskIndex) {
+ return BLOCK_PREFIX + runtimeEdgeId + BLOCK_ID_SPLITTER + producerTaskIndex;
}
/**
@@ -148,6 +127,8 @@ public final class RuntimeIdGenerator {
return "ResourceSpec-" + resourceSpecIdGenerator.getAndIncrement();
}
+ //////////////////////////////////////////////////////////////// Parse IDs
+
/**
* Extracts runtime edge ID from a block ID.
*
@@ -210,14 +191,4 @@ public final class RuntimeIdGenerator {
private static String[] parseTaskId(final String taskId) {
return taskId.split(TASK_INFIX);
}
-
- /**
- * Extracts logical task ID from a physical task ID.
- *
- * @param physicalTaskId the physical task ID to extract.
- * @return the logical task ID.
- */
- public static String getLogicalTaskIdIdFromPhysicalTaskId(final String physicalTaskId) {
- return physicalTaskId.split(PHYSICAL_TASK_ID_SPLITTER)[0];
- }
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index b9d263b..7d3e7f4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -87,7 +87,7 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
});
- return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getTaskIRVertexMap());
+ return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getIdToIRVertex());
}
/**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
deleted file mode 100644
index 94e1332..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.Readable;
-
-/**
- * BoundedSourceTask.
- * @param <O> the output type.
- */
-public final class BoundedSourceTask<O> extends Task {
- private Readable<O> readable;
-
- /**
- * Constructor.
- * @param taskId id of the task.
- * @param irVertexId id of the IR vertex.
- */
- public BoundedSourceTask(final String taskId,
- final String irVertexId) {
- super(taskId, irVertexId);
- }
-
- /**
- * Sets the readable for this task.
- * @param readableToSet the readable to set.
- */
- public void setReadable(final Readable<O> readableToSet) {
- this.readable = readableToSet;
- }
-
- /**
- * @return the readable of source data.
- */
- public Readable<O> getReadable() {
- return readable;
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
similarity index 56%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
index d5f19a7..e7e37ef 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
@@ -23,16 +23,9 @@ import java.util.List;
import java.util.Map;
/**
- * A ScheduledTask is a grouping of {@link Task}s that belong to a stage.
- * Executors receive units of ScheduledTasks during job execution,
- * and thus the resource type of all tasks of a ScheduledTask must be identical.
- * A stage contains a list of IDs of Tasks whose length corresponds to stage/operator parallelism.
- *
- * This class includes all information which will be required from the executor after scheduled,
- * including the (serialized) DAG of {@link Task}s,
- * the incoming/outgoing edges to/from the stage the Task belongs to, and so on.
+ * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
*/
-public final class ScheduledTask implements Serializable {
+public final class ExecutableTask implements Serializable {
private final String jobId;
private final String taskId;
private final int taskIdx;
@@ -41,37 +34,37 @@ public final class ScheduledTask implements Serializable {
private final int attemptIdx;
private final String containerType;
private final byte[] serializedTaskDag;
- private final Map<String, Readable> logicalTaskIdToReadable;
+ private final Map<String, Readable> irVertexIdToReadable;
/**
* Constructor.
*
- * @param jobId the id of the job.
- * @param serializedTaskDag the serialized DAG of the task.
- * @param taskId the ID of the scheduled task.
- * @param taskIncomingEdges the incoming edges of the task.
- * @param taskOutgoingEdges the outgoing edges of the task.
- * @param attemptIdx the attempt index.
- * @param containerType the type of container to execute the task on.
- * @param logicalTaskIdToReadable the map between logical task ID and readable.
+ * @param jobId the id of the job.
+ * @param taskId the ID of the scheduled task.
+ * @param attemptIdx the attempt index.
+ * @param containerType the type of container to execute the task on.
+ * @param serializedIRDag the serialized DAG of the task.
+ * @param taskIncomingEdges the incoming edges of the task.
+ * @param taskOutgoingEdges the outgoing edges of the task.
+ * @param irVertexIdToReadable the map between logical task ID and readable.
*/
- public ScheduledTask(final String jobId,
- final byte[] serializedTaskDag,
- final String taskId,
- final List<PhysicalStageEdge> taskIncomingEdges,
- final List<PhysicalStageEdge> taskOutgoingEdges,
- final int attemptIdx,
- final String containerType,
- final Map<String, Readable> logicalTaskIdToReadable) {
+ public ExecutableTask(final String jobId,
+ final String taskId,
+ final int attemptIdx,
+ final String containerType,
+ final byte[] serializedIRDag,
+ final List<PhysicalStageEdge> taskIncomingEdges,
+ final List<PhysicalStageEdge> taskOutgoingEdges,
+ final Map<String, Readable> irVertexIdToReadable) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
- this.taskIncomingEdges = taskIncomingEdges;
- this.taskOutgoingEdges = taskOutgoingEdges;
this.attemptIdx = attemptIdx;
this.containerType = containerType;
- this.serializedTaskDag = serializedTaskDag;
- this.logicalTaskIdToReadable = logicalTaskIdToReadable;
+ this.serializedTaskDag = serializedIRDag;
+ this.taskIncomingEdges = taskIncomingEdges;
+ this.taskOutgoingEdges = taskOutgoingEdges;
+ this.irVertexIdToReadable = irVertexIdToReadable;
}
/**
@@ -84,7 +77,7 @@ public final class ScheduledTask implements Serializable {
/**
* @return the serialized DAG of the task.
*/
- public byte[] getSerializedTaskDag() {
+ public byte[] getSerializedIRDag() {
return serializedTaskDag;
}
@@ -133,7 +126,7 @@ public final class ScheduledTask implements Serializable {
/**
* @return the map between logical task ID and readable.
*/
- public Map<String, Readable> getLogicalTaskIdToReadable() {
- return logicalTaskIdToReadable;
+ public Map<String, Readable> getIrVertexIdToReadable() {
+ return irVertexIdToReadable;
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
deleted file mode 100644
index 249adff..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/MetricCollectionBarrierTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * MetricCollectionBarrierTask.
- */
-public final class MetricCollectionBarrierTask extends Task {
- /**
- * Constructor.
- * @param taskId id of the task.
- * @param irVertexId id of the IR vertex.
- */
- MetricCollectionBarrierTask(final String taskId,
- final String irVertexId) {
- super(taskId, irVertexId);
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
deleted file mode 100644
index 61fa810..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/OperatorTask.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-
-/**
- * OperatorTask.
- */
-public final class OperatorTask extends Task {
- private final Transform transform;
-
- /**
- * Constructor.
- * @param taskId id of the task.
- * @param runtimeVertexId id of the runtime vertex.
- * @param transform transform to perform.
- */
- public OperatorTask(final String taskId,
- final String runtimeVertexId,
- final Transform transform) {
- super(taskId, runtimeVertexId);
- this.transform = transform;
- }
-
- /**
- * @return the transform to perform.
- */
- public Transform getTransform() {
- return transform;
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
index 85950fa..b25a0ba 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
@@ -25,26 +25,23 @@ import java.util.Map;
* A job's physical plan, to be executed by the Runtime.
*/
public final class PhysicalPlan implements Serializable {
-
private final String id;
-
private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
-
- private final Map<Task, IRVertex> taskIRVertexMap;
+ private final Map<String, IRVertex> idToIRVertex;
/**
* Constructor.
*
* @param id ID of the plan.
* @param stageDAG the DAG of stages.
- * @param taskIRVertexMap map from task to IR vertex.
+ * @param idToIRVertex map from task to IR vertex.
*/
public PhysicalPlan(final String id,
final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
- final Map<Task, IRVertex> taskIRVertexMap) {
+ final Map<String, IRVertex> idToIRVertex) {
this.id = id;
this.stageDAG = stageDAG;
- this.taskIRVertexMap = taskIRVertexMap;
+ this.idToIRVertex = idToIRVertex;
}
/**
@@ -62,20 +59,10 @@ public final class PhysicalPlan implements Serializable {
}
/**
- * Get an IR vertex of the given task.
- *
- * @param task task to find the IR vertex of.
- * @return the corresponding IR vertex of the given task.
- */
- public IRVertex getIRVertexOf(final Task task) {
- return taskIRVertexMap.get(task);
- }
-
- /**
* @return the map from task to IR vertex.
*/
- public Map<Task, IRVertex> getTaskIRVertexMap() {
- return taskIRVertexMap;
+ public Map<String, IRVertex> getIdToIRVertex() {
+ return idToIRVertex;
}
@Override
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
index 2ae7271..27991e3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
@@ -24,7 +24,6 @@ import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.stage.*;
import edu.snu.nemo.common.exception.IllegalVertexOperationException;
@@ -70,6 +69,7 @@ public final class PhysicalPlanGenerator
// for debugging purposes.
dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
// then create tasks and make it into a physical execution plan.
+
return stagesIntoPlan(dagOfStages);
}
@@ -197,16 +197,14 @@ public final class PhysicalPlanGenerator
return dagOfStagesBuilder.build();
}
- // Map that keeps track of the IRVertex of each tasks
- private final Map<Task, IRVertex> taskIRVertexMap = new HashMap<>();
+
+ private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
/**
- * Getter for taskIRVertexMap.
- *
- * @return the taskIRVertexMap.
+ * @return the idToIRVertex map.
*/
- public Map<Task, IRVertex> getTaskIRVertexMap() {
- return taskIRVertexMap;
+ public Map<String, IRVertex> getIdToIRVertex() {
+ return idToIRVertex;
}
/**
@@ -221,72 +219,68 @@ public final class PhysicalPlanGenerator
final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
for (final Stage stage : dagOfStages.getVertices()) {
- final Map<IRVertex, Task> irVertexTaskMap = new HashMap<>();
final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
- // Only one task DAG will be created and reused.
- final DAGBuilder<Task, RuntimeEdge<Task>> stageInternalDAGBuilder = new DAGBuilder<>();
- // Collect split source readables in advance and bind to each scheduled task to avoid extra source split.
- final List<Map<String, Readable>> logicalTaskIdToReadables = new ArrayList<>(stageParallelism);
+ // Only one DAG will be created and reused.
+ final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
+ // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
+ final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
for (int i = 0; i < stageParallelism; i++) {
- logicalTaskIdToReadables.add(new HashMap<>());
+ vertexIdToReadables.add(new HashMap<>());
}
- // Iterate over the vertices contained in this stage to convert to tasks.
+ // Iterate over the vertices contained in this stage
stageVertices.forEach(irVertex -> {
- final Task newTaskToAdd;
+ if (!(irVertex instanceof SourceVertex
+ || irVertex instanceof OperatorVertex
+ || irVertex instanceof MetricCollectionBarrierVertex)) {
+ // Sanity check
+ throw new IllegalStateException(irVertex.toString());
+ }
+
if (irVertex instanceof SourceVertex) {
final SourceVertex sourceVertex = (SourceVertex) irVertex;
+ // Take care of the Readables
try {
final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
- final String irVertexId = sourceVertex.getId();
- final String logicalTaskId = RuntimeIdGenerator.generateLogicalTaskId(irVertexId);
for (int i = 0; i < stageParallelism; i++) {
- logicalTaskIdToReadables.get(i).put(logicalTaskId, readables.get(i));
+ vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
}
- newTaskToAdd = new BoundedSourceTask(logicalTaskId, irVertexId);
} catch (Exception e) {
throw new PhysicalPlanGenerationException(e);
}
- } else if (irVertex instanceof OperatorVertex) {
- final OperatorVertex operatorVertex = (OperatorVertex) irVertex;
- final String operatorVertexId = operatorVertex.getId();
- newTaskToAdd = new OperatorTask(RuntimeIdGenerator.generateLogicalTaskId(operatorVertexId),
- operatorVertexId, operatorVertex.getTransform());
-
- } else if (irVertex instanceof MetricCollectionBarrierVertex) {
- final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
- (MetricCollectionBarrierVertex) irVertex;
- final String metricVertexId = metricCollectionBarrierVertex.getId();
- newTaskToAdd = new MetricCollectionBarrierTask(RuntimeIdGenerator.generateLogicalTaskId(metricVertexId),
- metricVertexId);
- } else {
- throw new IllegalVertexOperationException("This vertex type is not supported");
+
+ // Clear internal metadata
+ sourceVertex.clearInternalStates();
}
- stageInternalDAGBuilder.addVertex(newTaskToAdd);
- irVertexTaskMap.put(irVertex, newTaskToAdd);
- taskIRVertexMap.put(newTaskToAdd, irVertex);
+
+ stageInternalDAGBuilder.addVertex(irVertex);
+ idToIRVertex.put(irVertex.getId(), irVertex);
});
- // connect internal edges in the task. It suffices to iterate over only the stage internal inEdges.
+ // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
stageInternalDAG.getVertices().forEach(irVertex -> {
final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
inEdges.forEach(edge ->
- stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(edge.getId(), edge.getExecutionProperties(),
- irVertexTaskMap.get(edge.getSrc()), irVertexTaskMap.get(edge.getDst()),
- edge.getCoder(), edge.isSideInput())));
+ stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
+ edge.getId(),
+ edge.getExecutionProperties(),
+ edge.getSrc(),
+ edge.getDst(),
+ edge.getCoder(),
+ edge.isSideInput())));
});
- // Create the task to add for this stage.
+ // Create the physical stage.
final PhysicalStage physicalStage =
- new PhysicalStage(stage.getId(), stageInternalDAGBuilder.build(),
- stageParallelism, stage.getScheduleGroupIndex(), containerType, logicalTaskIdToReadables);
+ new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+ stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
physicalDAGBuilder.addVertex(physicalStage);
runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
@@ -300,7 +294,8 @@ public final class PhysicalPlanGenerator
physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
stageEdge.getExecutionProperties(),
- stageEdge.getSrcVertex(), stageEdge.getDstVertex(),
+ stageEdge.getSrcVertex(),
+ stageEdge.getDstVertex(),
srcStage, dstStage,
stageEdge.getCoder(),
stageEdge.isSideInput()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
index ded0a8e..a58cfba 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
@@ -18,6 +18,7 @@ package edu.snu.nemo.runtime.common.plan.physical;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.commons.lang3.SerializationUtils;
@@ -30,50 +31,50 @@ import java.util.Map;
* PhysicalStage.
*/
public final class PhysicalStage extends Vertex {
- private final DAG<Task, RuntimeEdge<Task>> taskDag;
+ private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
private final int parallelism;
private final int scheduleGroupIndex;
private final String containerType;
- private final byte[] serializedTaskDag;
- private final List<Map<String, Readable>> logicalTaskIdToReadables;
+ private final byte[] serializedIRDag;
+ private final List<Map<String, Readable>> vertexIdToReadables;
/**
* Constructor.
*
- * @param stageId ID of the stage.
- * @param taskDag the DAG of the task in this stage.
- * @param parallelism how many tasks will be executed in this stage.
- * @param scheduleGroupIndex the schedule group index.
- * @param containerType the type of container to execute the task on.
- * @param logicalTaskIdToReadables the list of maps between logical task ID and {@link Readable}.
+ * @param stageId ID of the stage.
+ * @param irDag the DAG of the task in this stage.
+ * @param parallelism how many tasks will be executed in this stage.
+ * @param scheduleGroupIndex the schedule group index.
+ * @param containerType the type of container to execute the task on.
+ * @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
*/
public PhysicalStage(final String stageId,
- final DAG<Task, RuntimeEdge<Task>> taskDag,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
final int parallelism,
final int scheduleGroupIndex,
final String containerType,
- final List<Map<String, Readable>> logicalTaskIdToReadables) {
+ final List<Map<String, Readable>> vertexIdToReadables) {
super(stageId);
- this.taskDag = taskDag;
+ this.irDag = irDag;
this.parallelism = parallelism;
this.scheduleGroupIndex = scheduleGroupIndex;
this.containerType = containerType;
- this.serializedTaskDag = SerializationUtils.serialize(taskDag);
- this.logicalTaskIdToReadables = logicalTaskIdToReadables;
+ this.serializedIRDag = SerializationUtils.serialize(irDag);
+ this.vertexIdToReadables = vertexIdToReadables;
}
/**
- * @return the task.
+ * @return the IRVertex DAG.
*/
- public DAG<Task, RuntimeEdge<Task>> getTaskDag() {
- return taskDag;
+ public DAG<IRVertex, RuntimeEdge<IRVertex>> getIRDAG() {
+ return irDag;
}
/**
* @return the serialized DAG of the task.
*/
- public byte[] getSerializedTaskDag() {
- return serializedTaskDag;
+ public byte[] getSerializedIRDAG() {
+ return serializedIRDag;
}
/**
@@ -102,17 +103,17 @@ public final class PhysicalStage extends Vertex {
}
/**
- * @return the list of maps between logical task ID and readable.
+ * @return the list of maps between vertex ID and readables.
*/
- public List<Map<String, Readable>> getLogicalTaskIdToReadables() {
- return logicalTaskIdToReadables;
+ public List<Map<String, Readable>> getVertexIdToReadables() {
+ return vertexIdToReadables;
}
@Override
public String propertiesToJSON() {
final StringBuilder sb = new StringBuilder();
sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
- sb.append(", \"taskDag\": ").append(taskDag);
+ sb.append(", \"irDag\": ").append(irDag);
sb.append(", \"parallelism\": ").append(parallelism);
sb.append(", \"containerType\": \"").append(containerType).append("\"");
sb.append('}');
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
deleted file mode 100644
index 8d241e1..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Task.
- * The index value is identical to the Task's index it belongs to.
- */
-public abstract class Task extends Vertex {
- private final String irVertexId;
-
- /**
- * Constructor.
- *
- * @param taskId id of the task.
- * @param irVertexId id for the IR vertex.
- */
- public Task(final String taskId,
- final String irVertexId) {
- super(taskId);
- this.irVertexId = irVertexId;
- }
-
- /**
- * @return the id of the runtime vertex.
- */
- public final String getIrVertexId() {
- return irVertexId;
- }
-
- @Override
- public final String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"taskId\": \"").append(getId()).append("\"}");
- return sb.toString();
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
deleted file mode 100644
index 51f0d31..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/UnboundedSourceTask.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-/**
- * UnboundedSourceTask.
- */
-public final class UnboundedSourceTask extends Task {
- /**
- * Constructor.
- * @param taskId the id of the task.
- * @param runtimeVertexId id of the runtime vertex.
- */
- public UnboundedSourceTask(final String taskId,
- final String runtimeVertexId) {
- super(taskId, runtimeVertexId);
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index 08a9a66..31f56d5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -18,8 +18,7 @@ package edu.snu.nemo.runtime.common.state;
import edu.snu.nemo.common.StateMachine;
/**
- * Represents the states and their transitions of a
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
+ * Represents the states and their transitions of a task.
*/
public final class TaskState {
private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index c266bab..343b8e5 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -62,7 +62,7 @@ message TaskStateChangedMsg {
required string executorId = 1;
required string taskId = 2;
required TaskStateFromExecutor state = 3;
- optional string taskPutOnHoldId = 4;
+ optional string vertexPutOnHoldId = 4;
optional RecoverableFailureCause failureCause = 5;
required int32 attemptIdx = 6;
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8fd5281..7a7b523 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.executor;
import com.google.protobuf.ByteString;
import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.exception.IllegalMessageException;
import edu.snu.nemo.common.exception.UnknownFailureCauseException;
@@ -27,8 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageListener;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.executor.data.SerializerManager;
import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
import org.apache.commons.lang3.SerializationUtils;
@@ -87,35 +87,34 @@ public final class Executor {
return executorId;
}
- private synchronized void onTaskReceived(final ScheduledTask scheduledTask) {
+ private synchronized void onTaskReceived(final ExecutableTask executableTask) {
LOG.debug("Executor [{}] received Task [{}] to execute.",
- new Object[]{executorId, scheduledTask.getTaskId()});
- executorService.execute(() -> launchTask(scheduledTask));
+ new Object[]{executorId, executableTask.getTaskId()});
+ executorService.execute(() -> launchTask(executableTask));
}
/**
* Launches the Task, and keeps track of the execution state with taskStateManager.
- * @param scheduledTask to launch.
+ * @param executableTask to launch.
*/
- private void launchTask(final ScheduledTask scheduledTask) {
+ private void launchTask(final ExecutableTask executableTask) {
try {
- final DAG<Task, RuntimeEdge<Task>> taskDag =
- SerializationUtils.deserialize(scheduledTask.getSerializedTaskDag());
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
+ SerializationUtils.deserialize(executableTask.getSerializedIRDag());
final TaskStateManager taskStateManager =
- new TaskStateManager(scheduledTask, taskDag, executorId,
- persistentConnectionToMasterMap, metricMessageSender);
+ new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
- scheduledTask.getTaskIncomingEdges()
+ executableTask.getTaskIncomingEdges()
.forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
- scheduledTask.getTaskOutgoingEdges()
+ executableTask.getTaskOutgoingEdges()
.forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
- taskDag.getVertices().forEach(v -> {
- taskDag.getOutgoingEdgesOf(v)
+ irDag.getVertices().forEach(v -> {
+ irDag.getOutgoingEdgesOf(v)
.forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
});
new TaskExecutor(
- scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+ executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
ControlMessage.Message.newBuilder()
@@ -150,9 +149,9 @@ public final class Executor {
switch (message.getType()) {
case ScheduleTask:
final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
- final ScheduledTask scheduledTask =
+ final ExecutableTask executableTask =
SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
- onTaskReceived(scheduledTask);
+ onTaskReceived(executableTask);
break;
default:
throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index c68215a..2349ab7 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -21,9 +21,8 @@ import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.physical.*;
import edu.snu.nemo.runtime.common.state.TaskState;
@@ -41,79 +40,88 @@ import org.slf4j.LoggerFactory;
* Executes a task.
*/
public final class TaskExecutor {
+ // Static variables
private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
+ private static final String ITERATORID_PREFIX = "ITERATOR_";
+ private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
- private final DAG<Task, RuntimeEdge<Task>> taskDag;
+ // From ExecutableTask
+ private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
private final String taskId;
private final int taskIdx;
private final TaskStateManager taskStateManager;
private final List<PhysicalStageEdge> stageIncomingEdges;
private final List<PhysicalStageEdge> stageOutgoingEdges;
+ private Map<String, Readable> irVertexIdToReadable;
+
+ // Other parameters
private final DataTransferFactory channelFactory;
private final MetricCollector metricCollector;
- private final List<InputReader> inputReaders;
- private final Map<InputReader, List<TaskDataHandler>> inputReaderToDataHandlersMap;
+ // Data structures
+ private final Map<InputReader, List<IRVertexDataHandler>> inputReaderToDataHandlersMap;
private final Map<String, Iterator> idToSrcIteratorMap;
- private final Map<String, List<TaskDataHandler>> srcIteratorIdToDataHandlersMap;
- private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
+ private final Map<String, List<IRVertexDataHandler>> srcIteratorIdToDataHandlersMap;
+ private final Map<String, List<IRVertexDataHandler>> iteratorIdToDataHandlersMap;
private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>> partitionQueue;
- private List<TaskDataHandler> taskDataHandlers;
- private Map<OutputCollectorImpl, List<TaskDataHandler>> outputToChildrenDataHandlersMap;
- private final Set<String> finishedTaskIds;
- private int numPartitions;
- private Map<String, Readable> logicalTaskIdToReadable;
+ private List<IRVertexDataHandler> irVertexDataHandlers;
+ private Map<OutputCollectorImpl, List<IRVertexDataHandler>> outputToChildrenDataHandlersMap;
+ private final Set<String> finishedVertexIds;
// For metrics
private long serBlockSize;
private long encodedBlockSize;
- private boolean isExecutionRequested;
- private String logicalTaskIdPutOnHold;
+ // Misc
+ private boolean isExecuted;
+ private String irVertexIdPutOnHold;
+ private int numPartitions;
- private static final String ITERATORID_PREFIX = "ITERATOR_";
- private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
/**
* Constructor.
- * @param scheduledTask Task with information needed during execution.
- * @param taskDag A DAG of Tasks.
+ * @param executableTask Task with information needed during execution.
+ * @param irVertexDag A DAG of vertices.
* @param taskStateManager State manager for this Task.
* @param channelFactory For reading from/writing to data to other Stages.
* @param metricMessageSender For sending metric with execution stats to Master.
*/
- public TaskExecutor(final ScheduledTask scheduledTask,
- final DAG<Task, RuntimeEdge<Task>> taskDag,
+ public TaskExecutor(final ExecutableTask executableTask,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final TaskStateManager taskStateManager,
final DataTransferFactory channelFactory,
final MetricMessageSender metricMessageSender) {
- this.taskDag = taskDag;
- this.taskId = scheduledTask.getTaskId();
- this.taskIdx = scheduledTask.getTaskIdx();
+ // Information from the ExecutableTask.
+ this.irVertexDag = irVertexDag;
+ this.taskId = executableTask.getTaskId();
+ this.taskIdx = executableTask.getTaskIdx();
+ this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
+ this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
+ this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+
+ // Other parameters.
this.taskStateManager = taskStateManager;
- this.stageIncomingEdges = scheduledTask.getTaskIncomingEdges();
- this.stageOutgoingEdges = scheduledTask.getTaskOutgoingEdges();
- this.logicalTaskIdToReadable = scheduledTask.getLogicalTaskIdToReadable();
this.channelFactory = channelFactory;
this.metricCollector = new MetricCollector(metricMessageSender);
- this.logicalTaskIdPutOnHold = null;
- this.isExecutionRequested = false;
- this.inputReaders = new ArrayList<>();
+ // Initialize data structures.
this.inputReaderToDataHandlersMap = new ConcurrentHashMap<>();
this.idToSrcIteratorMap = new HashMap<>();
this.srcIteratorIdToDataHandlersMap = new HashMap<>();
this.iteratorIdToDataHandlersMap = new ConcurrentHashMap<>();
this.partitionQueue = new LinkedBlockingQueue<>();
this.outputToChildrenDataHandlersMap = new HashMap<>();
- this.taskDataHandlers = new ArrayList<>();
-
- this.finishedTaskIds = new HashSet<>();
- this.numPartitions = 0;
+ this.irVertexDataHandlers = new ArrayList<>();
+ this.finishedVertexIds = new HashSet<>();
+ // Metrics
this.serBlockSize = 0;
this.encodedBlockSize = 0;
+ // Misc
+ this.isExecuted = false;
+ this.irVertexIdPutOnHold = null;
+ this.numPartitions = 0;
initialize();
}
@@ -124,29 +132,23 @@ public final class TaskExecutor {
* 2) Prepares Transforms if needed.
*/
private void initialize() {
- // Initialize data read of SourceVertex.
- taskDag.getTopologicalSort().stream()
- .filter(task -> task instanceof BoundedSourceTask)
- .forEach(boundedSourceTask -> ((BoundedSourceTask) boundedSourceTask).setReadable(
- logicalTaskIdToReadable.get(boundedSourceTask.getId())));
-
- // Initialize data handlers for each Task.
- taskDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
+ // Initialize data handlers for each IRVertex.
+ irVertexDag.topologicalDo(irVertex -> irVertexDataHandlers.add(new IRVertexDataHandler(irVertex)));
// Initialize data transfer.
- // Construct a pointer-based DAG of TaskDataHandlers that are used for data transfer.
+ // Construct a pointer-based DAG of irVertexDataHandlers that are used for data transfer.
// 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
// to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
- taskDag.topologicalDo(task -> {
- final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(task);
- final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(task);
- final TaskDataHandler dataHandler = getTaskDataHandler(task);
-
- // Set data handlers of children tasks.
- // This forms a pointer-based DAG of TaskDataHandlers.
- final List<TaskDataHandler> childrenDataHandlers = new ArrayList<>();
- taskDag.getChildren(task.getId()).forEach(child ->
- childrenDataHandlers.add(getTaskDataHandler(child)));
+ irVertexDag.topologicalDo(irVertex -> {
+ final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+ final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+ final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
+
+ // Set data handlers of children irVertices.
+ // This forms a pointer-based DAG of irVertexDataHandlers.
+ final List<IRVertexDataHandler> childrenDataHandlers = new ArrayList<>();
+ irVertexDag.getChildren(irVertex.getId()).forEach(child ->
+ childrenDataHandlers.add(getIRVertexDataHandler(child)));
dataHandler.setChildrenDataHandler(childrenDataHandlers);
// Add InputReaders for inter-stage data transfer
@@ -158,7 +160,6 @@ public final class TaskExecutor {
if (inputReader.isSideInputReader()) {
dataHandler.addSideInputFromOtherStages(inputReader);
} else {
- inputReaders.add(inputReader);
inputReaderToDataHandlersMap.putIfAbsent(inputReader, new ArrayList<>());
inputReaderToDataHandlersMap.get(inputReader).add(dataHandler);
}
@@ -167,29 +168,29 @@ public final class TaskExecutor {
// Add OutputWriters for inter-stage data transfer
outEdgesToOtherStages.forEach(physicalStageEdge -> {
final OutputWriter outputWriter = channelFactory.createWriter(
- task, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+ irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
dataHandler.addOutputWriter(outputWriter);
});
// Add InputPipes for intra-stage data transfer
- addInputFromThisStage(task, dataHandler);
+ addInputFromThisStage(irVertex, dataHandler);
// Add OutputPipe for intra-stage data transfer
- setOutputCollector(task, dataHandler);
+ setOutputCollector(irVertex, dataHandler);
});
// Prepare Transforms if needed.
- taskDag.topologicalDo(task -> {
- if (task instanceof OperatorTask) {
- final Transform transform = ((OperatorTask) task).getTransform();
+ irVertexDag.topologicalDo(irVertex -> {
+ if (irVertex instanceof OperatorVertex) {
+ final Transform transform = ((OperatorVertex) irVertex).getTransform();
final Map<Transform, Object> sideInputMap = new HashMap<>();
- final TaskDataHandler dataHandler = getTaskDataHandler(task);
+ final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
// Check and collect side inputs.
if (!dataHandler.getSideInputFromOtherStages().isEmpty()) {
- sideInputFromOtherStages(task, sideInputMap);
+ sideInputFromOtherStages(irVertex, sideInputMap);
}
if (!dataHandler.getSideInputFromThisStage().isEmpty()) {
- sideInputFromThisStage(task, sideInputMap);
+ sideInputFromThisStage(irVertex, sideInputMap);
}
final Transform.Context transformContext = new ContextImpl(sideInputMap);
@@ -200,43 +201,41 @@ public final class TaskExecutor {
}
/**
- * Collect all inter-stage incoming edges of this task.
+ * Collect all inter-stage incoming edges of this vertex.
*
- * @param task the Task whose inter-stage incoming edges to be collected.
+ * @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
* @return the collected incoming edges.
*/
- private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final Task task) {
+ private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
return stageIncomingEdges.stream().filter(
- stageInEdge -> stageInEdge.getDstVertex().getId().equals(task.getIrVertexId()))
+ stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
.collect(Collectors.toSet());
}
/**
- * Collect all inter-stage outgoing edges of this task.
+ * Collect all inter-stage outgoing edges of this vertex.
*
- * @param task the Task whose inter-stage outgoing edges to be collected.
+ * @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
* @return the collected outgoing edges.
*/
- private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final Task task) {
+ private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
return stageOutgoingEdges.stream().filter(
- stageInEdge -> stageInEdge.getSrcVertex().getId().equals(task.getIrVertexId()))
+ stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
.collect(Collectors.toSet());
}
/**
- * Add input OutputCollectors to each {@link Task}.
- * Input OutputCollector denotes all the OutputCollectors of intra-Stage parent tasks of this task.
+ * Add input OutputCollectors to each {@link IRVertex}.
+ * Input OutputCollector denotes all the OutputCollectors of intra-Stage dependencies.
*
- * @param task the Task to add input OutputCollectors to.
+ * @param irVertex the IRVertex to add input OutputCollectors to.
*/
- private void addInputFromThisStage(final Task task, final TaskDataHandler dataHandler) {
- List<Task> parentTasks = taskDag.getParents(task.getId());
- final String physicalTaskId = getPhysicalTaskId(task.getId());
-
- if (parentTasks != null) {
- parentTasks.forEach(parent -> {
- final OutputCollectorImpl parentOutputCollector = getTaskDataHandler(parent).getOutputCollector();
- if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
+ private void addInputFromThisStage(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
+ List<IRVertex> parentVertices = irVertexDag.getParents(irVertex.getId());
+ if (parentVertices != null) {
+ parentVertices.forEach(parent -> {
+ final OutputCollectorImpl parentOutputCollector = getIRVertexDataHandler(parent).getOutputCollector();
+ if (parentOutputCollector.hasSideInputFor(irVertex.getId())) {
dataHandler.addSideInputFromThisStage(parentOutputCollector);
} else {
dataHandler.addInputFromThisStages(parentOutputCollector);
@@ -246,21 +245,15 @@ public final class TaskExecutor {
}
/**
- * Add output outputCollectors to each {@link Task}.
- * Output outputCollector denotes the one and only one outputCollector of this task.
- * Check the outgoing edges that will use this outputCollector,
- * and set this outputCollector as side input if any one of the edges uses this outputCollector as side input.
- *
- * @param task the Task to add output outputCollectors to.
+ * Add outputCollectors to each {@link IRVertex}.
+ * @param irVertex the IRVertex to add output outputCollectors to.
*/
- private void setOutputCollector(final Task task, final TaskDataHandler dataHandler) {
+ private void setOutputCollector(final IRVertex irVertex, final IRVertexDataHandler dataHandler) {
final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
- final String physicalTaskId = getPhysicalTaskId(task.getId());
-
- taskDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+ irVertexDag.getOutgoingEdgesOf(irVertex).forEach(outEdge -> {
if (outEdge.isSideInput()) {
outputCollector.setSideInputRuntimeEdge(outEdge);
- outputCollector.setAsSideInputFor(physicalTaskId);
+ outputCollector.setAsSideInputFor(irVertex.getId());
}
});
@@ -268,25 +261,17 @@ public final class TaskExecutor {
}
/**
- * Check that this task has OutputWriter for inter-stage data.
+ * Check that this irVertex has OutputWriter for inter-stage data.
*
- * @param task the task to check whether it has OutputWriters.
- * @return true if the task has OutputWriters.
+ * @param irVertex the irVertex to check whether it has OutputWriters.
+ * @return true if the irVertex has OutputWriters.
*/
- private boolean hasOutputWriter(final Task task) {
- return !getTaskDataHandler(task).getOutputWriters().isEmpty();
+ private boolean hasOutputWriter(final IRVertex irVertex) {
+ return !getIRVertexDataHandler(irVertex).getOutputWriters().isEmpty();
}
- /**
- * If the given task is MetricCollectionBarrierTask,
- * set task as put on hold and use it to decide Task state when Task finishes.
- *
- * @param task the task to check whether it has OutputWriters.
- * @return true if the task has OutputWriters.
- */
- private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
- final String physicalTaskId = getPhysicalTaskId(task.getId());
- logicalTaskIdPutOnHold = RuntimeIdGenerator.getLogicalTaskIdIdFromPhysicalTaskId(physicalTaskId);
+ private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex) {
+ irVertexIdPutOnHold = irVertex.getId();
}
/**
@@ -294,16 +279,15 @@ public final class TaskExecutor {
* As element-wise output write is done and the block is in memory,
* flush the block into the designated data store and commit it.
*
- * @param task the task with OutputWriter to flush and commit output block.
+ * @param irVertex the IRVertex with OutputWriter to flush and commit output block.
*/
- private void writeAndCloseOutputWriters(final Task task) {
- final String physicalTaskId = getPhysicalTaskId(task.getId());
+ private void writeAndCloseOutputWriters(final IRVertex irVertex) {
final List<Long> writtenBytesList = new ArrayList<>();
final Map<String, Object> metric = new HashMap<>();
- metricCollector.beginMeasurement(physicalTaskId, metric);
+ metricCollector.beginMeasurement(irVertex.getId(), metric);
final long writeStartTime = System.currentTimeMillis();
- getTaskDataHandler(task).getOutputWriters().forEach(outputWriter -> {
+ getIRVertexDataHandler(irVertex).getOutputWriters().forEach(outputWriter -> {
outputWriter.close();
final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
writtenBytes.ifPresent(writtenBytesList::add);
@@ -312,21 +296,25 @@ public final class TaskExecutor {
final long writeEndTime = System.currentTimeMillis();
metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
putWrittenBytesMetric(writtenBytesList, metric);
- metricCollector.endMeasurement(physicalTaskId, metric);
+ metricCollector.endMeasurement(irVertex.getId(), metric);
}
/**
* Get input iterator from BoundedSource and bind it with id.
*/
private void prepareInputFromSource() {
- taskDag.topologicalDo(task -> {
- if (task instanceof BoundedSourceTask) {
+ irVertexDag.topologicalDo(irVertex -> {
+ if (irVertex instanceof SourceVertex) {
try {
final String iteratorId = generateIteratorId();
- final Iterator iterator = ((BoundedSourceTask) task).getReadable().read().iterator();
+ final Readable readable = irVertexIdToReadable.get(irVertex.getId());
+ if (readable == null) {
+ throw new RuntimeException(irVertex.toString());
+ }
+ final Iterator iterator = readable.read().iterator();
idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
- srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
+ srcIteratorIdToDataHandlersMap.get(iteratorId).add(getIRVertexDataHandler(irVertex));
} catch (final BlockFetchException ex) {
taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
@@ -374,22 +362,21 @@ public final class TaskExecutor {
}
/**
- * Check whether all tasks in this Task are finished.
+ * Check whether all vertices in this Task are finished.
*
- * @return true if all tasks are finished.
+ * @return true if all vertices are finished.
*/
- private boolean finishedAllTasks() {
+ private boolean finishedAllVertices() {
// Total number of Tasks
- int taskNum = taskDataHandlers.size();
- int finishedTaskNum = finishedTaskIds.size();
-
- return finishedTaskNum == taskNum;
+ int vertexNum = irVertexDataHandlers.size();
+ int finishedVertexNum = finishedVertexIds.size();
+ return finishedVertexNum == vertexNum;
}
/**
- * Initialize the very first map of OutputCollector-children task DAG.
+ * Initialize the very first map of OutputCollector-children irVertex DAG.
* In each map entry, the OutputCollector contains input data to be propagated through
- * the children task DAG.
+ * the children irVertex DAG.
*/
private void initializeOutputToChildrenDataHandlersMap() {
srcIteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
@@ -403,11 +390,11 @@ public final class TaskExecutor {
}
/**
- * Update the map of OutputCollector-children task DAG.
+ * Update the map of OutputCollector-children irVertex DAG.
*/
private void updateOutputToChildrenDataHandlersMap() {
- Map<OutputCollectorImpl, List<TaskDataHandler>> currentMap = outputToChildrenDataHandlersMap;
- Map<OutputCollectorImpl, List<TaskDataHandler>> updatedMap = new HashMap<>();
+ Map<OutputCollectorImpl, List<IRVertexDataHandler>> currentMap = outputToChildrenDataHandlersMap;
+ Map<OutputCollectorImpl, List<IRVertexDataHandler>> updatedMap = new HashMap<>();
currentMap.values().forEach(dataHandlers ->
dataHandlers.forEach(dataHandler -> {
@@ -419,13 +406,13 @@ public final class TaskExecutor {
}
/**
- * Update the map of OutputCollector-children task DAG.
+ * Update the map of OutputCollector-children irVertex DAG.
*
- * @param task the Task with the transform to close.
+ * @param irVertex the IRVertex with the transform to close.
*/
- private void closeTransform(final Task task) {
- if (task instanceof OperatorTask) {
- Transform transform = ((OperatorTask) task).getTransform();
+ private void closeTransform(final IRVertex irVertex) {
+ if (irVertex instanceof OperatorVertex) {
+ Transform transform = ((OperatorVertex) irVertex).getTransform();
transform.close();
}
}
@@ -434,11 +421,11 @@ public final class TaskExecutor {
* As a preprocessing of side input data, get inter stage side input
* and form a map of source transform-side input.
*
- * @param task the task which receives side input from other stages.
+ * @param irVertex the IRVertex which receives side input from other stages.
* @param sideInputMap the map of source transform-side input to build.
*/
- private void sideInputFromOtherStages(final Task task, final Map<Transform, Object> sideInputMap) {
- getTaskDataHandler(task).getSideInputFromOtherStages().forEach(sideInputReader -> {
+ private void sideInputFromOtherStages(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+ getIRVertexDataHandler(irVertex).getSideInputFromOtherStages().forEach(sideInputReader -> {
try {
final DataUtil.IteratorWithNumBytes sideInputIterator = sideInputReader.read().get(0).get();
final Object sideInput = getSideInput(sideInputIterator);
@@ -447,7 +434,7 @@ public final class TaskExecutor {
if (inEdge instanceof PhysicalStageEdge) {
srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
} else {
- srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+ srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
}
sideInputMap.put(srcTransform, sideInput);
@@ -477,11 +464,11 @@ public final class TaskExecutor {
* Assumption: intra stage side input denotes a data element initially received
* via side input reader from other stages.
*
- * @param task the task which receives the data element marked as side input.
+ * @param irVertex the IRVertex which receives the data element marked as side input.
* @param sideInputMap the map of source transform-side input to build.
*/
- private void sideInputFromThisStage(final Task task, final Map<Transform, Object> sideInputMap) {
- getTaskDataHandler(task).getSideInputFromThisStage().forEach(input -> {
+ private void sideInputFromThisStage(final IRVertex irVertex, final Map<Transform, Object> sideInputMap) {
+ getIRVertexDataHandler(irVertex).getSideInputFromThisStage().forEach(input -> {
// because sideInput is only 1 element in the outputCollector
Object sideInput = input.remove();
final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
@@ -489,7 +476,7 @@ public final class TaskExecutor {
if (inEdge instanceof PhysicalStageEdge) {
srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
} else {
- srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
+ srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
}
sideInputMap.put(srcTransform, sideInput);
});
@@ -505,11 +492,10 @@ public final class TaskExecutor {
long boundedSrcReadEndTime = 0;
long inputReadStartTime = 0;
long inputReadEndTime = 0;
- if (isExecutionRequested) {
+ if (isExecuted) {
throw new RuntimeException("Task {" + taskId + "} execution called again!");
- } else {
- isExecutionRequested = true;
}
+ isExecuted = true;
taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
LOG.info("{} Executing!", taskId);
@@ -523,12 +509,12 @@ public final class TaskExecutor {
inputReadStartTime = System.currentTimeMillis();
prepareInputFromOtherStages();
- // Execute the Task DAG.
+ // Execute the IRVertex DAG.
try {
srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
iterator.forEachRemaining(element -> {
- for (final TaskDataHandler dataHandler : dataHandlers) {
+ for (final IRVertexDataHandler dataHandler : dataHandlers) {
runTask(dataHandler, element);
}
});
@@ -539,9 +525,9 @@ public final class TaskExecutor {
Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = partitionQueue.take();
final String iteratorId = idToIteratorPair.left();
final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right();
- List<TaskDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
+ List<IRVertexDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
iterator.forEachRemaining(element -> {
- for (final TaskDataHandler dataHandler : dataHandlers) {
+ for (final IRVertexDataHandler dataHandler : dataHandlers) {
runTask(dataHandler, element);
}
});
@@ -566,44 +552,44 @@ public final class TaskExecutor {
metric.put("InputReadTime(ms)", inputReadEndTime - inputReadStartTime);
// Process intra-Task data.
- // Intra-Task data comes from outputCollectors of this Task's Tasks.
+ // Intra-Task data comes from outputCollectors of this Task's vertices.
initializeOutputToChildrenDataHandlersMap();
- while (!finishedAllTasks()) {
+ while (!finishedAllVertices()) {
outputToChildrenDataHandlersMap.forEach((outputCollector, childrenDataHandlers) -> {
- // Get the task that has this outputCollector as its output outputCollector
- Task outputCollectorOwnerTask = taskDataHandlers.stream()
+ // Get the vertex that has this outputCollector as its output outputCollector
+ final IRVertex outputProducer = irVertexDataHandlers.stream()
.filter(dataHandler -> dataHandler.getOutputCollector() == outputCollector)
- .findFirst().get().getTask();
+ .findFirst().get().getIRVertex();
- // Before consuming the output of outputCollectorOwnerTask as input,
+ // Before consuming the output of outputProducer as input,
// close transform if it is OperatorTransform.
- closeTransform(outputCollectorOwnerTask);
+ closeTransform(outputProducer);
- // Set outputCollectorOwnerTask as finished.
- finishedTaskIds.add(getPhysicalTaskId(outputCollectorOwnerTask.getId()));
+ // Set outputProducer as finished.
+ finishedVertexIds.add(outputProducer.getId());
while (!outputCollector.isEmpty()) {
final Object element = outputCollector.remove();
- // Pass outputCollectorOwnerTask's output to its children tasks recursively.
+ // Pass outputProducer's output to its children tasks recursively.
if (!childrenDataHandlers.isEmpty()) {
- for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+ for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
runTask(childDataHandler, element);
}
}
// Write element-wise to OutputWriters if any and close the OutputWriters.
- if (hasOutputWriter(outputCollectorOwnerTask)) {
+ if (hasOutputWriter(outputProducer)) {
// If outputCollector isn't empty(if closeTransform produced some output),
// write them element-wise to OutputWriters.
List<OutputWriter> outputWritersOfTask =
- getTaskDataHandler(outputCollectorOwnerTask).getOutputWriters();
+ getIRVertexDataHandler(outputProducer).getOutputWriters();
outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
}
}
- if (hasOutputWriter(outputCollectorOwnerTask)) {
- writeAndCloseOutputWriters(outputCollectorOwnerTask);
+ if (hasOutputWriter(outputProducer)) {
+ writeAndCloseOutputWriters(outputProducer);
}
});
updateOutputToChildrenDataHandlersMap();
@@ -625,47 +611,47 @@ public final class TaskExecutor {
final boolean available = serBlockSize >= 0;
putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
metricCollector.endMeasurement(taskId, metric);
- if (logicalTaskIdPutOnHold == null) {
+ if (irVertexIdPutOnHold == null) {
taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
} else {
taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD,
- Optional.of(logicalTaskIdPutOnHold),
+ Optional.of(irVertexIdPutOnHold),
Optional.empty());
}
LOG.info("{} Complete!", taskId);
}
/**
- * Recursively executes a task with the input data element.
+ * Recursively executes a vertex with the input data element.
*
- * @param dataHandler TaskDataHandler of a task to execute.
+ * @param dataHandler IRVertexDataHandler of a vertex to execute.
* @param dataElement input data element to process.
*/
- private void runTask(final TaskDataHandler dataHandler, final Object dataElement) {
- final Task task = dataHandler.getTask();
+ private void runTask(final IRVertexDataHandler dataHandler, final Object dataElement) {
+ final IRVertex irVertex = dataHandler.getIRVertex();
final OutputCollectorImpl outputCollector = dataHandler.getOutputCollector();
- // Process element-wise depending on the Task type
- if (task instanceof BoundedSourceTask) {
+ // Process element-wise depending on the vertex type
+ if (irVertex instanceof SourceVertex) {
if (dataElement == null) { // null used for Beam VoidCoders
final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
outputCollector.emit(nullForVoidCoder);
} else {
outputCollector.emit(dataElement);
}
- } else if (task instanceof OperatorTask) {
- final Transform transform = ((OperatorTask) task).getTransform();
+ } else if (irVertex instanceof OperatorVertex) {
+ final Transform transform = ((OperatorVertex) irVertex).getTransform();
transform.onData(dataElement);
- } else if (task instanceof MetricCollectionBarrierTask) {
+ } else if (irVertex instanceof MetricCollectionBarrierVertex) {
if (dataElement == null) { // null used for Beam VoidCoders
final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
outputCollector.emit(nullForVoidCoder);
} else {
outputCollector.emit(dataElement);
}
- setTaskPutOnHold((MetricCollectionBarrierTask) task);
+ setIRVertexPutOnHold((MetricCollectionBarrierVertex) irVertex);
} else {
- throw new UnsupportedOperationException("This type of Task is not supported");
+ throw new UnsupportedOperationException("This type of IRVertex is not supported");
}
// For the produced output
@@ -673,15 +659,15 @@ public final class TaskExecutor {
final Object element = outputCollector.remove();
// Pass output to its children recursively.
- List<TaskDataHandler> childrenDataHandlers = dataHandler.getChildren();
+ List<IRVertexDataHandler> childrenDataHandlers = dataHandler.getChildren();
if (!childrenDataHandlers.isEmpty()) {
- for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+ for (final IRVertexDataHandler childDataHandler : childrenDataHandlers) {
runTask(childDataHandler, element);
}
}
// Write element-wise to OutputWriters if any
- if (hasOutputWriter(task)) {
+ if (hasOutputWriter(irVertex)) {
List<OutputWriter> outputWritersOfTask = dataHandler.getOutputWriters();
outputWritersOfTask.forEach(outputWriter -> outputWriter.write(element));
}
@@ -689,16 +675,6 @@ public final class TaskExecutor {
}
/**
- * Get the matching physical task id of the given logical task id.
- *
- * @param logicalTaskId the logical task id.
- * @return the physical task id.
- */
- private String getPhysicalTaskId(final String logicalTaskId) {
- return RuntimeIdGenerator.generatePhysicalTaskId(taskIdx, logicalTaskId);
- }
-
- /**
* Generate a unique iterator id.
*
* @return the iterator id.
@@ -707,9 +683,9 @@ public final class TaskExecutor {
return ITERATORID_PREFIX + ITERATORID_GENERATOR.getAndIncrement();
}
- private TaskDataHandler getTaskDataHandler(final Task task) {
- return taskDataHandlers.stream()
- .filter(dataHandler -> dataHandler.getTask() == task)
+ private IRVertexDataHandler getIRVertexDataHandler(final IRVertex irVertex) {
+ return irVertexDataHandlers.stream()
+ .filter(dataHandler -> dataHandler.getIRVertex() == irVertex)
.findFirst().get();
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 77d5136..e2a31d6 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -15,16 +15,13 @@
*/
package edu.snu.nemo.runtime.executor;
-import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.exception.UnknownExecutionStateException;
import edu.snu.nemo.common.exception.UnknownFailureCauseException;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import java.util.*;
@@ -47,13 +44,12 @@ public final class TaskStateManager {
private final MetricCollector metricCollector;
private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
- public TaskStateManager(final ScheduledTask scheduledTask,
- final DAG<Task, RuntimeEdge<Task>> taskDag,
- final String executorId,
- final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
- final MetricMessageSender metricMessageSender) {
- this.taskId = scheduledTask.getTaskId();
- this.attemptIdx = scheduledTask.getAttemptIdx();
+ public TaskStateManager(final ExecutableTask executableTask,
+ final String executorId,
+ final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
+ final MetricMessageSender metricMessageSender) {
+ this.taskId = executableTask.getTaskId();
+ this.attemptIdx = executableTask.getAttemptIdx();
this.executorId = executorId;
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.metricCollector = new MetricCollector(metricMessageSender);
@@ -62,11 +58,11 @@ public final class TaskStateManager {
/**
* Updates the state of the task.
* @param newState of the task.
- * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+ * @param vertexPutOnHold the vertex put on hold.
* @param cause only provided as non-empty upon recoverable failures.
*/
public synchronized void onTaskStateChanged(final TaskState.State newState,
- final Optional<String> taskPutOnHold,
+ final Optional<String> vertexPutOnHold,
final Optional<TaskState.RecoverableFailureCause> cause) {
final Map<String, Object> metric = new HashMap<>();
@@ -98,7 +94,7 @@ public final class TaskStateManager {
break;
case ON_HOLD:
LOG.debug("Task ID {} put on hold.", this.taskId);
- notifyTaskStateToMaster(newState, taskPutOnHold, cause);
+ notifyTaskStateToMaster(newState, vertexPutOnHold, cause);
break;
default:
throw new IllegalStateException("Illegal state at this point");
@@ -108,20 +104,20 @@ public final class TaskStateManager {
/**
* Notifies the change in task state to master.
* @param newState of the task.
- * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
+ * @param vertexPutOnHold the vertex put on hold.
* @param cause only provided as non-empty upon recoverable failures.
*/
private void notifyTaskStateToMaster(final TaskState.State newState,
- final Optional<String> taskPutOnHold,
- final Optional<TaskState.RecoverableFailureCause> cause) {
+ final Optional<String> vertexPutOnHold,
+ final Optional<TaskState.RecoverableFailureCause> cause) {
final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
ControlMessage.TaskStateChangedMsg.newBuilder()
- .setExecutorId(executorId)
- .setTaskId(taskId)
- .setAttemptIdx(attemptIdx)
- .setState(convertState(newState));
- if (taskPutOnHold.isPresent()) {
- msgBuilder.setTaskPutOnHoldId(taskPutOnHold.get());
+ .setExecutorId(executorId)
+ .setTaskId(taskId)
+ .setAttemptIdx(attemptIdx)
+ .setState(convertState(newState));
+ if (vertexPutOnHold.isPresent()) {
+ msgBuilder.setVertexPutOnHoldId(vertexPutOnHold.get());
}
if (cause.isPresent()) {
msgBuilder.setFailureCause(convertFailureCause(cause.get()));
@@ -139,33 +135,33 @@ public final class TaskStateManager {
private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
switch (state) {
- case READY:
- return ControlMessage.TaskStateFromExecutor.READY;
- case EXECUTING:
- return ControlMessage.TaskStateFromExecutor.EXECUTING;
- case COMPLETE:
- return ControlMessage.TaskStateFromExecutor.COMPLETE;
- case FAILED_RECOVERABLE:
- return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
- case FAILED_UNRECOVERABLE:
- return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
- case ON_HOLD:
- return ControlMessage.TaskStateFromExecutor.ON_HOLD;
- default:
- throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
+ case READY:
+ return ControlMessage.TaskStateFromExecutor.READY;
+ case EXECUTING:
+ return ControlMessage.TaskStateFromExecutor.EXECUTING;
+ case COMPLETE:
+ return ControlMessage.TaskStateFromExecutor.COMPLETE;
+ case FAILED_RECOVERABLE:
+ return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
+ case FAILED_UNRECOVERABLE:
+ return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
+ case ON_HOLD:
+ return ControlMessage.TaskStateFromExecutor.ON_HOLD;
+ default:
+ throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
}
}
private ControlMessage.RecoverableFailureCause convertFailureCause(
- final TaskState.RecoverableFailureCause cause) {
+ final TaskState.RecoverableFailureCause cause) {
switch (cause) {
- case INPUT_READ_FAILURE:
- return ControlMessage.RecoverableFailureCause.InputReadFailure;
- case OUTPUT_WRITE_FAILURE:
- return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
- default:
- throw new UnknownFailureCauseException(
- new Throwable("The failure cause for the recoverable failure is unknown"));
+ case INPUT_READ_FAILURE:
+ return ControlMessage.RecoverableFailureCause.InputReadFailure;
+ case OUTPUT_WRITE_FAILURE:
+ return ControlMessage.RecoverableFailureCause.OutputWriteFailure;
+ default:
+ throw new UnknownFailureCauseException(
+ new Throwable("The failure cause for the recoverable failure is unknown"));
}
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index d7b13f2..9385545 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -159,6 +159,7 @@ public final class BlockManagerWorker {
numSerializedBytes += partition.getNumSerializedBytes();
numEncodedBytes += partition.getNumEncodedBytes();
}
+
return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(innerIterator, numSerializedBytes,
numEncodedBytes));
} catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
@@ -207,13 +208,16 @@ public final class BlockManagerWorker {
.build());
return responseFromMasterFuture;
});
- blockLocationFuture.whenComplete((message, throwable) -> pendingBlockLocationRequest.remove(blockId));
+ blockLocationFuture.whenComplete((message, throwable) -> {
+ pendingBlockLocationRequest.remove(blockId);
+ });
// Using thenCompose so that fetching block data starts after getting response from master.
return blockLocationFuture.thenCompose(responseFromMaster -> {
if (responseFromMaster.getType() != ControlMessage.MessageType.BlockLocationInfo) {
throw new RuntimeException("Response message type mismatch!");
}
+
final ControlMessage.BlockLocationInfoMsg blockLocationInfoMsg =
responseFromMaster.getBlockLocationInfoMsg();
if (!blockLocationInfoMsg.hasOwnerExecutorId()) {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
index 13531c2..7a4068e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferFactory.java
@@ -18,7 +18,6 @@ package edu.snu.nemo.runtime.executor.datatransfer;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.reef.tang.annotations.Parameter;
@@ -42,18 +41,18 @@ public final class DataTransferFactory {
/**
* Creates an {@link OutputWriter} between two stages.
*
- * @param srcTask the {@link Task} that outputs the data to be written.
+ * @param srcIRVertex the {@link IRVertex} that outputs the data to be written.
* @param srcTaskIdx the index of the source task.
* @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
* @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
* @return the {@link OutputWriter} created.
*/
- public OutputWriter createWriter(final Task srcTask,
+ public OutputWriter createWriter(final IRVertex srcIRVertex,
final int srcTaskIdx,
final IRVertex dstIRVertex,
final RuntimeEdge<?> runtimeEdge) {
return new OutputWriter(hashRangeMultiplier, srcTaskIdx,
- srcTask.getIrVertexId(), dstIRVertex, runtimeEdge, blockManagerWorker);
+ srcIRVertex.getId(), dstIRVertex, runtimeEdge, blockManagerWorker);
}
/**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
similarity index 72%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
index 56f5e5b..84b7a8e 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/IRVertexDataHandler.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.executor.datatransfer;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import java.util.ArrayList;
import java.util.List;
@@ -26,9 +26,9 @@ import java.util.List;
* As Task input is processed element-wise, Task output element percolates down
* through the DAG of children TaskDataHandlers.
*/
-public final class TaskDataHandler {
- private final Task task;
- private List<TaskDataHandler> children;
+public final class IRVertexDataHandler {
+ private final IRVertex irVertex;
+ private List<IRVertexDataHandler> children;
private final List<OutputCollectorImpl> inputFromThisStage;
private final List<InputReader> sideInputFromOtherStages;
private final List<OutputCollectorImpl> sideInputFromThisStage;
@@ -36,12 +36,12 @@ public final class TaskDataHandler {
private final List<OutputWriter> outputWriters;
/**
- * TaskDataHandler Constructor.
+ * IRVertexDataHandler Constructor.
*
- * @param task Task of this TaskDataHandler.
+ * @param irVertex Task of this IRVertexDataHandler.
*/
- public TaskDataHandler(final Task task) {
- this.task = task;
+ public IRVertexDataHandler(final IRVertex irVertex) {
+ this.irVertex = irVertex;
this.children = new ArrayList<>();
this.inputFromThisStage = new ArrayList<>();
this.sideInputFromOtherStages = new ArrayList<>();
@@ -51,12 +51,12 @@ public final class TaskDataHandler {
}
/**
- * Get the task that owns this TaskDataHandler.
+ * Get the irVertex that owns this IRVertexDataHandler.
*
- * @return task of this TaskDataHandler.
+ * @return irVertex of this IRVertexDataHandler.
*/
- public Task getTask() {
- return task;
+ public IRVertex getIRVertex() {
+ return irVertex;
}
/**
@@ -64,7 +64,7 @@ public final class TaskDataHandler {
*
* @return DAG of children tasks' TaskDataHandlers.
*/
- public List<TaskDataHandler> getChildren() {
+ public List<IRVertexDataHandler> getChildren() {
return children;
}
@@ -89,18 +89,18 @@ public final class TaskDataHandler {
}
/**
- * Get OutputCollector of this task.
+ * Get OutputCollector of this irVertex.
*
- * @return OutputCollector of this task.
+ * @return OutputCollector of this irVertex.
*/
public OutputCollectorImpl getOutputCollector() {
return outputCollector;
}
/**
- * Get OutputWriters of this task.
+ * Get OutputWriters of this irVertex.
*
- * @return OutputWriters of this task.
+ * @return OutputWriters of this irVertex.
*/
public List<OutputWriter> getOutputWriters() {
return outputWriters;
@@ -111,14 +111,14 @@ public final class TaskDataHandler {
*
* @param childrenDataHandler list of children TaskDataHandlers.
*/
- public void setChildrenDataHandler(final List<TaskDataHandler> childrenDataHandler) {
+ public void setChildrenDataHandler(final List<IRVertexDataHandler> childrenDataHandler) {
children = childrenDataHandler;
}
/**
- * Add OutputCollector of a parent task that will provide intra-stage input.
+ * Add OutputCollector of a parent irVertex that will provide intra-stage input.
*
- * @param input OutputCollector of a parent task.
+ * @param input OutputCollector of a parent irVertex.
*/
public void addInputFromThisStages(final OutputCollectorImpl input) {
inputFromThisStage.add(input);
@@ -134,27 +134,27 @@ public final class TaskDataHandler {
}
/**
- * Add OutputCollector of a parent task that will provide intra-stage side input.
+ * Add OutputCollector of a parent irVertex that will provide intra-stage side input.
*
- * @param ocAsSideInput OutputCollector of a parent task with side input.
+ * @param ocAsSideInput OutputCollector of a parent irVertex with side input.
*/
public void addSideInputFromThisStage(final OutputCollectorImpl ocAsSideInput) {
sideInputFromThisStage.add(ocAsSideInput);
}
/**
- * Set OutputCollector of this task.
+ * Set OutputCollector of this irVertex.
*
- * @param oc OutputCollector of this task.
+ * @param oc OutputCollector of this irVertex.
*/
public void setOutputCollector(final OutputCollectorImpl oc) {
outputCollector = oc;
}
/**
- * Add OutputWriter of this task.
+ * Add OutputWriter of this irVertex.
*
- * @param outputWriter OutputWriter of this task.
+ * @param outputWriter OutputWriter of this irVertex.
*/
public void addOutputWriter(final OutputWriter outputWriter) {
outputWriters.add(outputWriter);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 80ab1ea..49401aa 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -20,6 +20,7 @@ import edu.snu.nemo.common.exception.SchedulingException;
import edu.snu.nemo.common.exception.UnknownExecutionStateException;
import edu.snu.nemo.common.StateMachine;
import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -162,9 +163,9 @@ public final class JobStateManager {
// Initialize states for blocks of stage internal edges
taskIdsForStage.forEach(taskId -> {
- final DAG<Task, RuntimeEdge<Task>> taskInternalDag = physicalStage.getTaskDag();
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
taskInternalDag.getVertices().forEach(task -> {
- final List<RuntimeEdge<Task>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
+ final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
internalOutgoingEdges.forEach(taskRuntimeEdge -> {
final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 8d22424..51c505d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -119,7 +119,7 @@ public final class RuntimeMaster {
public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
final int maxScheduleAttempt) {
final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
- this.irVertices.addAll(plan.getTaskIRVertexMap().values());
+ this.irVertices.addAll(plan.getIdToIRVertex().values());
try {
final JobStateManager jobStateManager =
new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
@@ -268,7 +268,7 @@ public final class RuntimeMaster {
taskStateChangedMsg.getTaskId(),
taskStateChangedMsg.getAttemptIdx(),
convertTaskState(taskStateChangedMsg.getState()),
- taskStateChangedMsg.getTaskPutOnHoldId(),
+ taskStateChangedMsg.getVertexPutOnHoldId(),
convertFailureCause(taskStateChangedMsg.getFailureCause()));
break;
case ExecutorFailed:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index caf1606..8536746 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.driver.context.ActiveContext;
@@ -95,17 +95,17 @@ public final class ExecutorRepresenter {
/**
* Marks the Task as running, and sends scheduling message to the executor.
- * @param scheduledTask
+ * @param executableTask
*/
- public void onTaskScheduled(final ScheduledTask scheduledTask) {
- runningTasks.add(scheduledTask.getTaskId());
- runningTaskToAttempt.put(scheduledTask.getTaskId(), scheduledTask.getAttemptIdx());
- failedTasks.remove(scheduledTask.getTaskId());
+ public void onTaskScheduled(final ExecutableTask executableTask) {
+ runningTasks.add(executableTask.getTaskId());
+ runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
+ failedTasks.remove(executableTask.getTaskId());
serializationExecutorService.submit(new Runnable() {
@Override
public void run() {
- final byte[] serialized = SerializationUtils.serialize(scheduledTask);
+ final byte[] serialized = SerializationUtils.serialize(executableTask);
sendControlMessage(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index a74ff10..f32a441 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -19,6 +19,7 @@ import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
@@ -136,14 +137,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
* @param taskId whose state has changed
* @param taskAttemptIndex of the task whose state has changed
* @param newState the state to change to
- * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
+ * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
*/
@Override
public void onTaskStateChanged(final String executorId,
final String taskId,
final int taskAttemptIndex,
final TaskState.State newState,
- @Nullable final String taskPutOnHold,
+ @Nullable final String vertexPutOnHold,
final TaskState.RecoverableFailureCause failureCause) {
final int currentTaskAttemptIndex = jobStateManager.getCurrentAttemptIndexForTask(taskId);
if (taskAttemptIndex == currentTaskAttemptIndex) {
@@ -158,7 +159,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
break;
case ON_HOLD:
jobStateManager.onTaskStateChanged(taskId, newState);
- onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+ onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
break;
case FAILED_UNRECOVERABLE:
throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
@@ -392,7 +393,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
LOG.info("Scheduling Stage {}", stageToSchedule.getId());
// each readable and source task will be bounded in executor.
- final List<Map<String, Readable>> logicalTaskIdToReadables = stageToSchedule.getLogicalTaskIdToReadables();
+ final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
taskIdsToSchedule.forEach(taskId -> {
blockManagerMaster.onProducerTaskScheduled(taskId);
@@ -400,23 +401,27 @@ public final class BatchSingleJobScheduler implements Scheduler {
final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
LOG.debug("Enqueueing {}", taskId);
- pendingTaskCollection.add(new ScheduledTask(physicalPlan.getId(),
- stageToSchedule.getSerializedTaskDag(), taskId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
- stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskIdx)));
+ pendingTaskCollection.add(new ExecutableTask(
+ physicalPlan.getId(),
+ taskId,
+ attemptIdx,
+ stageToSchedule.getContainerType(),
+ stageToSchedule.getSerializedIRDAG(),
+ stageIncomingEdges,
+ stageOutgoingEdges,
+ vertexIdToReadables.get(taskIdx)));
});
schedulerRunner.onATaskAvailable();
}
/**
- * Gets the DAG of a task from it's ID.
- *
- * @param taskId the ID of the task to get.
- * @return the DAG of the task.
+ * @param taskId id of the task
+ * @return the IR dag
*/
- private DAG<Task, RuntimeEdge<Task>> getTaskDagById(final String taskId) {
+ private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
- return physicalStage.getTaskDag();
+ return physicalStage.getIRDAG();
}
}
throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -471,13 +476,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
/**
* Action for after task execution is put on hold.
- * @param executorId the ID of the executor.
- * @param taskId the ID of the task.
- * @param taskPutOnHold the ID of task that is put on hold.
+ * @param executorId the ID of the executor.
+ * @param taskId the ID of the task.
+ * @param vertexPutOnHold the ID of vertex that is put on hold.
*/
private void onTaskExecutionOnHold(final String executorId,
final String taskId,
- final String taskPutOnHold) {
+ final String vertexPutOnHold) {
LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
executorRegistry.updateExecutor(executorId, (executor, state) -> {
executor.onTaskExecutionComplete(taskId);
@@ -491,15 +496,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
if (stageComplete) {
// get optimization vertex from the task.
final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
- getTaskDagById(taskId).getVertices().stream() // get tasks list
- .filter(task -> task.getId().equals(taskPutOnHold)) // find it
- .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
+ getVertexDagById(taskId).getVertices().stream() // get vertex list
+ .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
.filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
.distinct()
.map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
.findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
+ " called with failed task ids by some other task than "
- + MetricCollectionBarrierTask.class.getSimpleName()));
+ + MetricCollectionBarrierVertex.class.getSimpleName()));
// and we will use this vertex to perform metric collection and dynamic optimization.
pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 1c38e35..ab5081c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -46,10 +46,10 @@ public final class CompositeSchedulingPolicy implements SchedulingPolicy {
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask) {
+ final ExecutableTask executableTask) {
Set<ExecutorRepresenter> candidates = executorRepresenterSet;
for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
- candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTask);
+ candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
}
return candidates;
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index d64fffb..f23a1f0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -37,20 +37,20 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
* If the container type of target Task is NONE, it will return the original set.
- * @param scheduledTask {@link ScheduledTask} to be scheduled.
+ * @param executableTask {@link ExecutableTask} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask) {
+ final ExecutableTask executableTask) {
- if (scheduledTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+ if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
return executorRepresenterSet;
}
final Set<ExecutorRepresenter> candidateExecutors =
executorRepresenterSet.stream()
- .filter(executor -> executor.getContainerType().equals(scheduledTask.getContainerType()))
+ .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
.collect(Collectors.toSet());
return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 2f598d2..e72f486 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -35,12 +35,12 @@ public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
* Executors that do not have any free slots will be filtered by this policy.
- * @param scheduledTask {@link ScheduledTask} to be scheduled.
+ * @param executableTask {@link ExecutableTask} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask) {
+ final ExecutableTask executableTask) {
final Set<ExecutorRepresenter> candidateExecutors =
executorRepresenterSet.stream()
.filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index d6696e9..361307f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@ public interface PendingTaskCollection {
/**
* Adds a Task to this collection.
- * @param scheduledTask to add.
+ * @param executableTask to add.
*/
- void add(final ScheduledTask scheduledTask);
+ void add(final ExecutableTask executableTask);
/**
* Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@ public interface PendingTaskCollection {
* @throws NoSuchElementException if the specified Task is not in the queue,
* or removing this Task breaks scheduling order
*/
- ScheduledTask remove(final String taskId) throws NoSuchElementException;
+ ExecutableTask remove(final String taskId) throws NoSuchElementException;
/**
* Peeks stage that can be scheduled according to job dependency priority.
* Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
* @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
*/
- Optional<Collection<ScheduledTask>> peekSchedulableStage();
+ Optional<Collection<ExecutableTask>> peekSchedulableStage();
/**
* Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index 61077fa..a232a79 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
@@ -47,12 +47,12 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
- * @param scheduledTask {@link ScheduledTask} to be scheduled.
+ * @param executableTask {@link ExecutableTask} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask) {
+ final ExecutableTask executableTask) {
final OptionalInt minOccupancy =
executorRepresenterSet.stream()
.map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index f9bb73a..d34eb56 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -30,9 +30,6 @@ import javax.annotation.Nullable;
* RMT and ST meet only at two points: {@link ExecutorRegistry}, and {@link PendingTaskCollection},
* which are synchronized(ThreadSafe).
* Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
- *
- * Receives jobs to execute and schedules
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask} to executors.
*/
@DriverSide
@DefaultImplementation(BatchSingleJobScheduler.class)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index fa98e39..f411c1d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ public final class SchedulerRunner {
}
void doScheduleStage() {
- final Collection<ScheduledTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+ final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
if (stageToSchedule == null) {
// Task queue is empty
LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,7 +120,7 @@ public final class SchedulerRunner {
}
final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
- for (final ScheduledTask schedulableTask : stageToSchedule) {
+ for (final ExecutableTask schedulableTask : stageToSchedule) {
final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index ab9e251..1e71882 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@ import java.util.Set;
@DefaultImplementation(CompositeSchedulingPolicy.class)
public interface SchedulingPolicy {
Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask);
+ final ExecutableTask executableTask);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index a5f5736..0375e01 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
@@ -41,7 +41,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
/**
* Pending Tasks awaiting to be scheduled for each stage.
*/
- private final ConcurrentMap<String, Map<String, ScheduledTask>> stageIdToPendingTasks;
+ private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
/**
* Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
}
@Override
- public synchronized void add(final ScheduledTask scheduledTask) {
- final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+ public synchronized void add(final ExecutableTask executableTask) {
+ final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
if (taskIdToTask == null) {
- final Map<String, ScheduledTask> taskIdToTaskMap = new HashMap<>();
- taskIdToTaskMap.put(scheduledTask.getTaskId(), scheduledTask);
- updateSchedulableStages(stageId, scheduledTask.getContainerType());
+ final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
+ taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
+ updateSchedulableStages(stageId, executableTask.getContainerType());
return taskIdToTaskMap;
} else {
- taskIdToTask.put(scheduledTask.getTaskId(), scheduledTask);
+ taskIdToTask.put(executableTask.getTaskId(), executableTask);
return taskIdToTask;
}
});
@@ -81,18 +81,18 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
*/
@Override
- public synchronized ScheduledTask remove(final String taskId) throws NoSuchElementException {
+ public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
final String stageId = schedulableStages.peekFirst();
if (stageId == null) {
throw new NoSuchElementException("No schedulable stage in Task queue");
}
- final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+ final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
if (pendingTasksForStage == null) {
throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
}
- final ScheduledTask taskToSchedule = pendingTasksForStage.remove(taskId);
+ final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
if (taskToSchedule == null) {
throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
}
@@ -115,13 +115,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* or {@link Optional#empty} if the queue is empty
*/
@Override
- public synchronized Optional<Collection<ScheduledTask>> peekSchedulableStage() {
+ public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
final String stageId = schedulableStages.peekFirst();
if (stageId == null) {
return Optional.empty();
}
- final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+ final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
if (pendingTasksForStage == null) {
throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index 5432f0b..f7056bf 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
* If there is no source locations, will return original set.
- * @param scheduledTask {@link ScheduledTask} to be scheduled.
+ * @param executableTask {@link ExecutableTask} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ScheduledTask scheduledTask) {
+ final ExecutableTask executableTask) {
final Set<String> sourceLocations;
try {
- sourceLocations = getSourceLocations(scheduledTask.getLogicalTaskIdToReadable().values());
+ sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
} catch (final UnsupportedOperationException e) {
return executorRepresenterSet;
} catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 07d36a2..3150475 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,10 +16,8 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link ContainerTypeAwareSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
public final class ContainerTypeAwareSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -50,24 +48,24 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
- final ScheduledTask scheduledTask1 = mock(ScheduledTask.class);
- when(scheduledTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+ final ExecutableTask executableTask1 = mock(ExecutableTask.class);
+ when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors1 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTask1);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
assertEquals(expectedExecutors1, candidateExecutors1);
- final ScheduledTask scheduledTask2 = mock(ScheduledTask.class);
- when(scheduledTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+ final ExecutableTask executableTask2 = mock(ExecutableTask.class);
+ when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors2 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTask2);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
index cb100f0..0071b9f 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,10 +15,8 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -34,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link FreeSlotSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
public final class FreeSlotSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -53,12 +51,12 @@ public final class FreeSlotSchedulingPolicyTest {
final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
- final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+ final ExecutableTask executableTask = mock(ExecutableTask.class);
final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 10cced0..5edd885 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link RoundRobinSchedulingPolicy}
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
public final class RoundRobinSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public final class RoundRobinSchedulingPolicyTest {
final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
- final ScheduledTask scheduledTask = mock(ScheduledTask.class);
+ final ExecutableTask executableTask = mock(ExecutableTask.class);
final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
index 5aecc3d..0ae5998 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
@@ -83,7 +83,7 @@ public final class SingleTaskQueueTest {
executorService.submit(() -> {
try {
assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
- final ScheduledTask dequeuedTask = dequeue();
+ final ExecutableTask dequeuedTask = dequeue();
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
dagOf2Stages.get(1).getId());
@@ -135,7 +135,7 @@ public final class SingleTaskQueueTest {
executorService.submit(() -> {
try {
assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
- final ScheduledTask dequeuedTask = dequeue();
+ final ExecutableTask dequeuedTask = dequeue();
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
dagOf2Stages.get(0).getId());
@@ -211,9 +211,15 @@ public final class SingleTaskQueueTest {
*/
private void scheduleStage(final PhysicalStage stage) {
stage.getTaskIds().forEach(taskId ->
- pendingTaskPriorityQueue.add(new ScheduledTask(
- "TestPlan", stage.getSerializedTaskDag(), taskId, Collections.emptyList(),
- Collections.emptyList(), 0, stage.getContainerType(), Collections.emptyMap())));
+ pendingTaskPriorityQueue.add(new ExecutableTask(
+ "TestPlan",
+ taskId,
+ 0,
+ stage.getContainerType(),
+ stage.getSerializedIRDAG(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyMap())));
}
/**
@@ -221,17 +227,17 @@ public final class SingleTaskQueueTest {
* @return the stage name of the dequeued task.
*/
private String dequeueAndGetStageId() {
- final ScheduledTask scheduledTask = dequeue();
- return RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+ final ExecutableTask executableTask = dequeue();
+ return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
}
/**
* Dequeues a scheduled task from the task priority queue.
* @return the Task dequeued
*/
- private ScheduledTask dequeue() {
- final Collection<ScheduledTask> scheduledTasks
+ private ExecutableTask dequeue() {
+ final Collection<ExecutableTask> executableTasks
= pendingTaskPriorityQueue.peekSchedulableStage().get();
- return pendingTaskPriorityQueue.remove(scheduledTasks.iterator().next().getTaskId());
+ return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
}
}
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 87aa123..65f9d98 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
+import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
* Test cases for {@link SourceLocationAwareSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
public final class SourceLocationAwareSchedulingPolicyTest {
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
/**
- * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTask} when
+ * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
* there are no executors in appropriate location(s).
*/
@Test
@@ -54,7 +54,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
// Prepare test scenario
- final ScheduledTask task = CreateScheduledTask.withReadablesWithSourceLocations(
+ final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_0)));
final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public final class SourceLocationAwareSchedulingPolicyTest {
public void testSourceLocationAwareSchedulingWithMultiSource() {
final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
// Prepare test scenario
- final ScheduledTask task0 = CreateScheduledTask.withReadablesWithSourceLocations(
+ final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_1)));
- final ScheduledTask task1 = CreateScheduledTask.withReadablesWithSourceLocations(
+ final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
- final ScheduledTask task2 = CreateScheduledTask.withReadablesWithSourceLocations(
+ final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
Arrays.asList(SITE_1, SITE_2)));
- final ScheduledTask task3 = CreateScheduledTask.withReadablesWithSourceLocations(
+ final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
Arrays.asList(SITE_0, SITE_2)));
final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
- for (final ScheduledTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+ for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
new HashSet<>(Collections.singletonList(e)), task));
}
@@ -90,23 +90,23 @@ public final class SourceLocationAwareSchedulingPolicyTest {
/**
- * Utility for creating {@link ScheduledTask}.
+ * Utility for creating {@link ExecutableTask}.
*/
- private static final class CreateScheduledTask {
+ private static final class CreateExecutableTask {
private static final AtomicInteger taskIndex = new AtomicInteger(0);
private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
- private static ScheduledTask doCreate(final Collection<Readable> readables) {
- final ScheduledTask mockInstance = mock(ScheduledTask.class);
+ private static ExecutableTask doCreate(final Collection<Readable> readables) {
+ final ExecutableTask mockInstance = mock(ExecutableTask.class);
final Map<String, Readable> readableMap = new HashMap<>();
readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
readable));
when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
- when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
+ when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap);
return mockInstance;
}
- static ScheduledTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+ static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
try {
final List<Readable> readables = new ArrayList<>();
for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ScheduledTask withReadablesWithoutSourceLocations(final int numReadables) {
+ static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
try {
final List<Readable> readables = new ArrayList<>();
for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ScheduledTask withReadablesWhichThrowException(final int numReadables) {
+ static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
try {
final List<Readable> readables = new ArrayList<>();
for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ScheduledTask withoutReadables() {
+ static ExecutableTask withoutReadables() {
return doCreate(Collections.emptyList());
}
}
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index d856dd0..631f837 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -101,7 +101,7 @@ public final class TestPlanGenerator {
final Policy policy) throws Exception {
final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
- return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getTaskIRVertexMap());
+ return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
}
/**
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index 51b753e..f257926 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -82,7 +82,7 @@ public class ClientEndpointTest {
injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
final BlockManagerMaster pmm = injector.getInstance(BlockManagerMaster.class);
final JobStateManager jobStateManager = new JobStateManager(
- new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+ new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
pmm, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
final DriverEndpoint driverEndpoint = new DriverEndpoint(jobStateManager, clientEndpoint);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index affcbed..a6df1d4 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -20,10 +20,12 @@ import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -63,7 +65,7 @@ public final class TaskExecutorTest {
private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
private static final int SOURCE_PARALLELISM = 5;
private List elements;
- private Map<String, List<Object>> taskIdToOutputData;
+ private Map<String, List<Object>> vertexIdToOutputData;
private DataTransferFactory dataTransferFactory;
private TaskStateManager taskStateManager;
private MetricMessageSender metricMessageSender;
@@ -76,7 +78,7 @@ public final class TaskExecutorTest {
taskStateManager = mock(TaskStateManager.class);
// Mock a DataTransferFactory.
- taskIdToOutputData = new HashMap<>();
+ vertexIdToOutputData = new HashMap<>();
dataTransferFactory = mock(DataTransferFactory.class);
when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new InterStageReaderAnswer());
when(dataTransferFactory.createWriter(any(), anyInt(), any(), any())).then(new WriterAnswer());
@@ -88,17 +90,13 @@ public final class TaskExecutorTest {
}
/**
- * Test the {@link BoundedSourceTask} processing in {@link TaskExecutor}.
+ * Test the {@link edu.snu.nemo.common.ir.vertex.SourceVertex} processing in {@link TaskExecutor}.
*/
- @Test(timeout=2000)
- public void testSourceTask() throws Exception {
- final IRVertex sourceIRVertex = new SimpleIRVertex();
- final String sourceIrVertexId = sourceIRVertex.getId();
-
- final String sourceTaskId = RuntimeIdGenerator.generateLogicalTaskId("Source_IR_Vertex");
+ @Test(timeout=5000)
+ public void testSourceVertex() throws Exception {
+ final IRVertex sourceIRVertex = new EmptyComponents.EmptySourceVertex("empty");
final String stageId = RuntimeIdGenerator.generateStageId(0);
- final BoundedSourceTask<Integer> boundedSourceTask = new BoundedSourceTask<>(sourceTaskId, sourceIrVertexId);
final Readable readable = new Readable() {
@Override
public Iterable read() throws Exception {
@@ -109,30 +107,37 @@ public final class TaskExecutorTest {
throw new UnsupportedOperationException();
}
};
- final Map<String, Readable> logicalIdToReadable = new HashMap<>();
- logicalIdToReadable.put(sourceTaskId, readable);
+ final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+ vertexIdToReadable.put(sourceIRVertex.getId(), readable);
- final DAG<Task, RuntimeEdge<Task>> taskDag =
- new DAGBuilder<Task, RuntimeEdge<Task>>().addVertex(boundedSourceTask).build();
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+ new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
- final ScheduledTask scheduledTask =
- new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.emptyList(),
- Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, logicalIdToReadable);
+ final ExecutableTask executableTask =
+ new ExecutableTask(
+ "testSourceVertex",
+ taskId,
+ 0,
+ CONTAINER_TYPE,
+ new byte[0],
+ Collections.emptyList(),
+ Collections.singletonList(stageOutEdge),
+ vertexIdToReadable);
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
taskExecutor.execute();
// Check the output.
- assertEquals(100, taskIdToOutputData.get(sourceTaskId).size());
- assertEquals(elements.get(0), taskIdToOutputData.get(sourceTaskId).get(0));
+ assertEquals(100, vertexIdToOutputData.get(sourceIRVertex.getId()).size());
+ assertEquals(elements.get(0), vertexIdToOutputData.get(sourceIRVertex.getId()).get(0));
}
/**
- * Test the {@link OperatorTask} processing in {@link TaskExecutor}.
+ * Test the {@link edu.snu.nemo.common.ir.vertex.OperatorVertex} processing in {@link TaskExecutor}.
*
* The DAG of the task to test will looks like:
* operator task 1 -> operator task 2
@@ -142,47 +147,46 @@ public final class TaskExecutorTest {
* Because of this, the operator task 1 will process multiple partitions and emit data in multiple times also.
* On the other hand, operator task 2 will receive the output data once and produce a single output.
*/
- @Test//(timeout=2000)
- public void testOperatorTask() throws Exception {
- final IRVertex operatorIRVertex1 = new SimpleIRVertex();
- final IRVertex operatorIRVertex2 = new SimpleIRVertex();
- final String operatorIRVertexId1 = operatorIRVertex1.getId();
- final String operatorIRVertexId2 = operatorIRVertex2.getId();
+ @Test(timeout=5000)
+ public void testOperatorVertex() throws Exception {
+ final IRVertex operatorIRVertex1 = new OperatorVertex(new SimpleTransform());
+ final IRVertex operatorIRVertex2 = new OperatorVertex(new SimpleTransform());
final String runtimeIREdgeId = "Runtime edge between operator tasks";
- final String operatorTaskId1 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_1");
- final String operatorTaskId2 = RuntimeIdGenerator.generateLogicalTaskId("Operator_vertex_2");
final String stageId = RuntimeIdGenerator.generateStageId(1);
- final OperatorTask operatorTask1 =
- new OperatorTask(operatorTaskId1, operatorIRVertexId1, new SimpleTransform());
- final OperatorTask operatorTask2 =
- new OperatorTask(operatorTaskId2, operatorIRVertexId2, new SimpleTransform());
final Coder coder = Coder.DUMMY_CODER;
ExecutionPropertyMap edgeProperties = new ExecutionPropertyMap(runtimeIREdgeId);
edgeProperties.put(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
- final DAG<Task, RuntimeEdge<Task>> taskDag = new DAGBuilder<Task, RuntimeEdge<Task>>()
- .addVertex(operatorTask1)
- .addVertex(operatorTask2)
- .connectVertices(new RuntimeEdge<Task>(
- runtimeIREdgeId, edgeProperties, operatorTask1, operatorTask2, coder))
- .build();
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+ .addVertex(operatorIRVertex1)
+ .addVertex(operatorIRVertex2)
+ .connectVertices(new RuntimeEdge<IRVertex>(
+ runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
+ .buildWithoutSourceSinkCheck();
final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
- final ScheduledTask scheduledTask =
- new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.singletonList(stageInEdge),
- Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, Collections.emptyMap());
+ final ExecutableTask executableTask =
+ new ExecutableTask(
+ "testSourceVertex",
+ taskId,
+ 0,
+ CONTAINER_TYPE,
+ new byte[0],
+ Collections.singletonList(stageInEdge),
+ Collections.singletonList(stageOutEdge),
+ Collections.emptyMap());
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
taskExecutor.execute();
// Check the output.
- assertEquals(100, taskIdToOutputData.get(operatorTaskId2).size());
+ assertEquals(100, vertexIdToOutputData.get(operatorIRVertex2.getId()).size());
}
/**
@@ -234,15 +238,15 @@ public final class TaskExecutorTest {
@Override
public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
final Object[] args = invocationOnMock.getArguments();
- final Task dstTask = (Task) args[0];
+ final IRVertex vertex = (IRVertex) args[0];
final OutputWriter outputWriter = mock(OutputWriter.class);
doAnswer(new Answer() {
@Override
public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
final Object[] args = invocationOnMock.getArguments();
final Object dataToWrite = args[0];
- taskIdToOutputData.computeIfAbsent(dstTask.getId(), emptyTaskId -> new ArrayList<>());
- taskIdToOutputData.get(dstTask.getId()).add(dataToWrite);
+ vertexIdToOutputData.computeIfAbsent(vertex.getId(), emptyTaskId -> new ArrayList<>());
+ vertexIdToOutputData.get(vertex.getId()).add(dataToWrite);
return null;
}
}).when(outputWriter).write(any());
@@ -251,16 +255,6 @@ public final class TaskExecutorTest {
}
/**
- * Simple {@link IRVertex} for testing.
- */
- private class SimpleIRVertex extends IRVertex {
- @Override
- public IRVertex getClone() {
- return null; // Not used.
- }
- }
-
- /**
* Simple {@link Transform} for testing.
* @param <T> input/output type.
*/
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cb9ee78..cfdeb2f 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -29,7 +29,6 @@ import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.executor.data.*;
import edu.snu.nemo.runtime.executor.data.block.Block;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
-import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
import edu.snu.nemo.runtime.executor.data.streamchainer.CompressionStreamChainer;
import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
import edu.snu.nemo.runtime.executor.data.stores.*;
@@ -81,8 +80,8 @@ public final class BlockStoreTest {
private BlockManagerMaster blockManagerMaster;
private LocalMessageDispatcher messageDispatcher;
// Variables for shuffle test
- private static final int NUM_WRITE_TASKS = 3;
- private static final int NUM_READ_TASKS = 3;
+ private static final int NUM_WRITE_VERTICES = 3;
+ private static final int NUM_READ_VERTICES = 3;
private static final int DATA_SIZE = 1000;
private List<String> blockIdList;
private List<List<NonSerializedPartition<Integer>>> partitionsPerBlock;
@@ -115,20 +114,18 @@ public final class BlockStoreTest {
when(serializerManager.getSerializer(any())).thenReturn(SERIALIZER);
// Following part is for for the shuffle test.
- final List<String> writeTaskIdList = new ArrayList<>(NUM_WRITE_TASKS);
- final List<String> readTaskIdList = new ArrayList<>(NUM_READ_TASKS);
- blockIdList = new ArrayList<>(NUM_WRITE_TASKS);
- partitionsPerBlock = new ArrayList<>(NUM_WRITE_TASKS);
+ final List<String> writeVertexIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+ final List<String> readTaskIdList = new ArrayList<>(NUM_READ_VERTICES);
+ blockIdList = new ArrayList<>(NUM_WRITE_VERTICES);
+ partitionsPerBlock = new ArrayList<>(NUM_WRITE_VERTICES);
// Generates the ids of the tasks to be used.
- IntStream.range(0, NUM_WRITE_TASKS).forEach(
- number -> writeTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Write_IR_vertex")));
- IntStream.range(0, NUM_READ_TASKS).forEach(
- number -> readTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("Read_IR_vertex")));
+ IntStream.range(0, NUM_WRITE_VERTICES).forEach(number -> writeVertexIdList.add("Write_IR_vertex"));
+ IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
// Generates the ids and the data of the blocks to be used.
final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
- IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx -> {
+ IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
// Create a block for each writer task.
final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
blockIdList.add(blockId);
@@ -137,27 +134,26 @@ public final class BlockStoreTest {
blockId, BlockState.State.SCHEDULED, null);
// Create blocks for this block.
- final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_TASKS);
+ final List<NonSerializedPartition<Integer>> partitionsForBlock = new ArrayList<>(NUM_READ_VERTICES);
partitionsPerBlock.add(partitionsForBlock);
- IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx -> {
- final int partitionsCount = writeTaskIdx * NUM_READ_TASKS + readTaskIdx;
+ IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx -> {
+ final int partitionsCount = writeTaskIdx * NUM_READ_VERTICES + readTaskIdx;
partitionsForBlock.add(new NonSerializedPartition(
readTaskIdx, getRangedNumList(partitionsCount * DATA_SIZE, (partitionsCount + 1) * DATA_SIZE), -1, -1));
});
});
// Following part is for the concurrent read test.
- final String writeTaskId = RuntimeIdGenerator.generateLogicalTaskId("conc_write_IR_vertex");
+ final String writeTaskId = "conc_write_IR_vertex";
final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
// Generates the ids and the data to be used.
- concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1);
+ concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
blockManagerMaster.initializeState(concBlockId, "unused");
blockManagerMaster.onBlockStateChanged(
concBlockId, BlockState.State.SCHEDULED, null);
- IntStream.range(0, NUM_CONC_READ_TASKS).forEach(
- number -> concReadTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("conc_read_IR_vertex")));
+ IntStream.range(0, NUM_CONC_READ_TASKS).forEach(number -> concReadTaskIdList.add("conc_read_IR_vertex"));
concBlockPartition = new NonSerializedPartition(0, getRangedNumList(0, CONC_READ_DATA_SIZE), -1, -1);
// Following part is for the shuffle in hash range test
@@ -170,16 +166,14 @@ public final class BlockStoreTest {
expectedDataInRange = new ArrayList<>(NUM_READ_HASH_TASKS);
// Generates the ids of the tasks to be used.
- IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(
- number -> writeHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_write_IR_vertex")));
- IntStream.range(0, NUM_READ_HASH_TASKS).forEach(
- number -> readHashTaskIdList.add(RuntimeIdGenerator.generateLogicalTaskId("hash_read_IR_vertex")));
+ IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
+ IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
// Generates the ids and the data of the blocks to be used.
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
final String blockId = RuntimeIdGenerator.generateBlockId(
- hashEdge, NUM_WRITE_TASKS + NUM_READ_TASKS + 1 + writeTaskIdx);
+ hashEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1 + writeTaskIdx);
hashedBlockIdList.add(blockId);
blockManagerMaster.initializeState(blockId, "Unused");
blockManagerMaster.onBlockStateChanged(
@@ -307,14 +301,14 @@ public final class BlockStoreTest {
*/
private void shuffle(final BlockStore writerSideStore,
final BlockStore readerSideStore) {
- final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_TASKS);
- final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_TASKS);
- final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_TASKS);
- final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_TASKS);
+ final ExecutorService writeExecutor = Executors.newFixedThreadPool(NUM_WRITE_VERTICES);
+ final ExecutorService readExecutor = Executors.newFixedThreadPool(NUM_READ_VERTICES);
+ final List<Future<Boolean>> writeFutureList = new ArrayList<>(NUM_WRITE_VERTICES);
+ final List<Future<Boolean>> readFutureList = new ArrayList<>(NUM_READ_VERTICES);
final long startNano = System.nanoTime();
// Write concurrently
- IntStream.range(0, NUM_WRITE_TASKS).forEach(writeTaskIdx ->
+ IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx ->
writeFutureList.add(writeExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
@@ -338,7 +332,7 @@ public final class BlockStoreTest {
})));
// Wait each writer to success
- IntStream.range(0, NUM_WRITE_TASKS).forEach(writer -> {
+ IntStream.range(0, NUM_WRITE_VERTICES).forEach(writer -> {
try {
assertTrue(writeFutureList.get(writer).get());
} catch (final Exception e) {
@@ -348,12 +342,12 @@ public final class BlockStoreTest {
final long writeEndNano = System.nanoTime();
// Read concurrently and check whether the result is equal to the input
- IntStream.range(0, NUM_READ_TASKS).forEach(readTaskIdx ->
+ IntStream.range(0, NUM_READ_VERTICES).forEach(readTaskIdx ->
readFutureList.add(readExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() {
try {
- for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_TASKS; writeTaskIdx++) {
+ for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_VERTICES; writeTaskIdx++) {
readResultCheck(blockIdList.get(writeTaskIdx), HashRange.of(readTaskIdx, readTaskIdx + 1),
readerSideStore, partitionsPerBlock.get(writeTaskIdx).get(readTaskIdx).getData());
}
@@ -366,7 +360,7 @@ public final class BlockStoreTest {
})));
// Wait each reader to success
- IntStream.range(0, NUM_READ_TASKS).forEach(reader -> {
+ IntStream.range(0, NUM_READ_VERTICES).forEach(reader -> {
try {
assertTrue(readFutureList.get(reader).get());
} catch (final Exception e) {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 998faf5..61ef9b0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -39,7 +39,6 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.Task;
import edu.snu.nemo.runtime.executor.Executor;
import edu.snu.nemo.runtime.executor.MetricManagerWorker;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -74,7 +73,6 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.util.*;
@@ -544,7 +542,7 @@ public final class DataTransferTest {
}
private PhysicalStage setupStages(final String stageId) {
- final DAG<Task, RuntimeEdge<Task>> emptyDag = new DAGBuilder<Task, RuntimeEdge<Task>>().build();
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
index bdc62e5..5af2476 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
@@ -125,7 +125,7 @@ public final class JobStateManagerTest {
new TestPolicy(), "");
final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
final JobStateManager jobStateManager = new JobStateManager(
- new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+ new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
assertEquals(jobStateManager.getJobId(), "TestPlan");
@@ -164,7 +164,7 @@ public final class JobStateManagerTest {
final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
final JobStateManager jobStateManager = new JobStateManager(
- new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()),
+ new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
assertFalse(jobStateManager.checkJobTermination());
--
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.