You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/05 08:03:01 UTC
[incubator-nemo] branch master updated: [NEMO-78] Rename
PhysicalStage to Stage (#26)
This is an automated email from the ASF dual-hosted git repository.
jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new d418580 [NEMO-78] Rename PhysicalStage to Stage (#26)
d418580 is described below
commit d4185809cfc5814ed7620f4389989fc5cfa8e049
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 5 17:02:56 2018 +0900
[NEMO-78] Rename PhysicalStage to Stage (#26)
JIRA: NEMO-78: Rename PhysicalStage to Stage
Major changes:
* Remove the existing Stage that was only being used in the compiler-backend
* Rename the existing PhysicalStage to Stage
* Directly translate IRDAGs into Stage DAGs used in the runtime
* Rename all related variables
Minor changes to note:
* Also in json2dot.py, 'physicalStages' is renamed to 'stages', and that appears to break our online DAG visualizer. Needs @seojangho's review on this.
* Remove the disaggregation test which is redundant with the pado test, and change integration test parameters to use resources more efficiently
Tests for the changes:
* No new tests, as no new feature was added
Other comments:
N/A
resolves NEMO-78
---
bin/json2dot.py | 18 +-
.../nemo/compiler/backend/nemo/NemoBackend.java | 12 +-
.../optimizer/examples/EmptyComponents.java | 7 +-
.../snu/nemo/examples/beam/MapReduceITCase.java | 8 -
.../resources/beam_sample_executor_resources.json | 11 +-
.../resources/spark_sample_executor_resources.json | 9 +-
.../nemo/runtime/common/RuntimeIdGenerator.java | 16 +-
.../eventhandler/DynamicOptimizationEvent.java | 2 +-
.../DynamicOptimizationEventHandler.java | 2 +-
.../eventhandler/UpdatePhysicalPlanEvent.java | 2 +-
.../runtime/common/optimizer/RuntimeOptimizer.java | 2 +-
.../pass/runtime/DataSkewRuntimePass.java | 16 +-
.../common/optimizer/pass/runtime/RuntimePass.java | 2 +-
.../common/plan/{physical => }/ExecutableTask.java | 14 +-
.../common/plan/{physical => }/PhysicalPlan.java | 8 +-
.../plan/{physical => }/PhysicalPlanGenerator.java | 195 ++++++++-------------
.../{physical/PhysicalStage.java => Stage.java} | 19 +-
.../common/plan/{stage => }/StageBuilder.java | 71 ++++----
.../PhysicalStageEdge.java => StageEdge.java} | 28 +--
.../common/plan/{stage => }/StageEdgeBuilder.java | 7 +-
.../snu/nemo/runtime/common/plan/stage/Stage.java | 68 -------
.../nemo/runtime/common/plan/stage/StageEdge.java | 81 ---------
.../snu/nemo/runtime/common/state/JobState.java | 2 +-
.../snu/nemo/runtime/common/state/StageState.java | 2 +-
.../edu/snu/nemo/driver/UserApplicationRunner.java | 2 +-
.../edu/snu/nemo/runtime/executor/Executor.java | 2 +-
.../snu/nemo/runtime/executor/TaskExecutor.java | 31 ++--
.../nemo/runtime/executor/TaskStateManager.java | 2 +-
.../runtime/executor/datatransfer/InputReader.java | 6 +-
.../snu/nemo/runtime/master/JobStateManager.java | 32 ++--
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 2 +-
.../UpdatePhysicalPlanEventHandler.java | 2 +-
.../master/resource/ExecutorRepresenter.java | 2 +-
.../master/scheduler/BatchSingleJobScheduler.java | 57 +++---
.../scheduler/CompositeSchedulingPolicy.java | 2 +-
.../ContainerTypeAwareSchedulingPolicy.java | 2 +-
.../master/scheduler/FreeSlotSchedulingPolicy.java | 2 +-
.../master/scheduler/PendingTaskCollection.java | 4 +-
.../scheduler/RoundRobinSchedulingPolicy.java | 2 +-
.../nemo/runtime/master/scheduler/Scheduler.java | 2 +-
.../runtime/master/scheduler/SchedulerRunner.java | 2 +-
.../runtime/master/scheduler/SchedulingPolicy.java | 2 +-
.../master/scheduler/SingleJobTaskCollection.java | 16 +-
.../SourceLocationAwareSchedulingPolicy.java | 2 +-
.../runtime/master/BlockManagerMasterTest.java | 4 +-
.../nemo/runtime/master/JobStateManagerTest.java | 17 +-
.../scheduler/BatchSingleJobSchedulerTest.java | 26 +--
.../ContainerTypeAwareSchedulingPolicyTest.java | 2 +-
.../master/scheduler/FaultToleranceTest.java | 19 +-
.../scheduler/FreeSlotSchedulingPolicyTest.java | 2 +-
.../scheduler/RoundRobinSchedulingPolicyTest.java | 2 +-
.../master/scheduler/SchedulerTestUtil.java | 10 +-
.../master/scheduler/SingleTaskQueueTest.java | 14 +-
.../SourceLocationAwareSchedulingPolicyTest.java | 2 +-
.../runtime/plangenerator/TestPlanGenerator.java | 10 +-
.../snu/nemo/tests/client/ClientEndpointTest.java | 10 +-
.../compiler/backend/nemo/NemoBackendTest.java | 4 +-
.../runtime/common/plan/DAGConverterTest.java | 24 ++-
.../tests/runtime/executor/TaskExecutorTest.java | 11 +-
.../runtime/executor/data/BlockStoreTest.java | 6 +-
.../executor/datatransfer/DataTransferTest.java | 24 +--
61 files changed, 374 insertions(+), 589 deletions(-)
diff --git a/bin/json2dot.py b/bin/json2dot.py
index e4c77bf..85538e7 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -48,18 +48,18 @@ class JobState:
def __init__(self, data):
self.id = data['jobId']
self.stages = {}
- for stage in data['physicalStages']:
- self.stages[stage['id']] = PhysicalStageState(stage)
+ for stage in data['stages']:
+ self.stages[stage['id']] = StageState(stage)
@classmethod
def empty(cls):
- return cls({'jobId': None, 'physicalStages': []})
+ return cls({'jobId': None, 'stages': []})
def get(self, id):
try:
return self.stages[id]
except:
- return PhysicalStageState.empty()
+ return StageState.empty()
-class PhysicalStageState:
+class StageState:
def __init__(self, data):
self.id = data['id']
self.state = data['state']
@@ -114,7 +114,7 @@ class DAG:
def Vertex(id, properties, state):
try:
- return PhysicalStage(id, properties, state)
+ return Stage(id, properties, state)
except:
pass
try:
@@ -249,7 +249,7 @@ class LoopVertex:
vertexId = list(filter(lambda v: edgeId in self.incoming[v], self.incoming))[0]
return self.dag.vertices[vertexId]
-class PhysicalStage:
+class Stage:
def __init__(self, id, properties, state):
self.id = id
self.irVertex = DAG(properties['irDag'], JobState.empty())
@@ -276,7 +276,7 @@ class PhysicalStage:
def Edge(src, dst, properties):
try:
- return PhysicalStageEdge(src, dst, properties)
+ return StageEdge(src, dst, properties)
except:
pass
try:
@@ -325,7 +325,7 @@ class IREdge:
return '{} -> {} [ltail = {}, lhead = {}, label = <{}>];'.format(src.oneVertex.idx,
dst.oneVertex.idx, src.logicalEnd, dst.logicalEnd, label)
-class PhysicalStageEdge:
+class StageEdge:
def __init__(self, src, dst, properties):
self.src = src
self.dst = dst
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4ef453c..27b98ea 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -20,10 +20,10 @@ import edu.snu.nemo.compiler.backend.Backend;
import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import org.apache.reef.tang.Tang;
/**
@@ -57,9 +57,9 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
*/
public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
final PhysicalPlanGenerator physicalPlanGenerator) {
- final DAG<PhysicalStage, PhysicalStageEdge> physicalStageDAG = irDAG.convert(physicalPlanGenerator);
+ final DAG<Stage, StageEdge> stageDAG = irDAG.convert(physicalPlanGenerator);
final PhysicalPlan physicalPlan = new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(),
- physicalStageDAG, physicalPlanGenerator.getIdToIRVertex());
+ stageDAG, physicalPlanGenerator.getIdToIRVertex());
return physicalPlan;
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index 34c66fc..7fd0d82 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -21,7 +21,6 @@ import edu.snu.nemo.common.ir.vertex.SourceVertex;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
/**
@@ -93,7 +92,11 @@ public class EmptyComponents {
@Override
public List<Readable<T>> getReadables(final int desirednumOfSplits) {
- return Arrays.asList(new EmptyReadable<>());
+ final List list = new ArrayList(desirednumOfSplits);
+ for (int i = 0; i < desirednumOfSplits; i++) {
+ list.add(new EmptyReadable<>());
+ }
+ return list;
}
@Override
diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 44b0504..d4f4432 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -77,14 +77,6 @@ public final class MapReduceITCase {
}
@Test (timeout = TIMEOUT)
- public void testDisagg() throws Exception {
- JobLauncher.main(builder
- .addJobId(MapReduceITCase.class.getSimpleName() + "_disagg")
- .addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
- .build());
- }
-
- @Test (timeout = TIMEOUT)
public void testPado() throws Exception {
JobLauncher.main(builder
.addJobId(MapReduceITCase.class.getSimpleName() + "_pado")
diff --git a/examples/resources/beam_sample_executor_resources.json b/examples/resources/beam_sample_executor_resources.json
index 1f7f973..ced110c 100644
--- a/examples/resources/beam_sample_executor_resources.json
+++ b/examples/resources/beam_sample_executor_resources.json
@@ -1,17 +1,12 @@
[
{
"type": "Transient",
- "memory_mb": 300,
- "capacity": 3
+ "memory_mb": 512,
+ "capacity": 5
},
{
"type": "Reserved",
- "memory_mb": 300,
+ "memory_mb": 512,
"capacity": 5
- },
- {
- "type": "Compute",
- "memory_mb": 300,
- "capacity": 3
}
]
diff --git a/examples/resources/spark_sample_executor_resources.json b/examples/resources/spark_sample_executor_resources.json
index 187bd45..ced110c 100644
--- a/examples/resources/spark_sample_executor_resources.json
+++ b/examples/resources/spark_sample_executor_resources.json
@@ -2,16 +2,11 @@
{
"type": "Transient",
"memory_mb": 512,
- "capacity": 2
+ "capacity": 5
},
{
"type": "Reserved",
"memory_mb": 512,
- "capacity": 2
- },
- {
- "type": "Compute",
- "memory_mb": 512,
- "capacity": 2
+ "capacity": 5
}
]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index b53ff34..e4fef9c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -40,7 +40,7 @@ public final class RuntimeIdGenerator {
//////////////////////////////////////////////////////////////// Generate IDs
/**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+ * Generates the ID for physical plan.
*
* @return the generated ID
*/
@@ -49,7 +49,7 @@ public final class RuntimeIdGenerator {
}
/**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+ * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.StageEdge}.
*
* @param irEdgeId .
* @return the generated ID
@@ -59,17 +59,7 @@ public final class RuntimeIdGenerator {
}
/**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.RuntimeEdge}.
- *
- * @param irEdgeId .
- * @return the generated ID
- */
- public static String generateRuntimeEdgeId(final String irEdgeId) {
- return "REdge-" + irEdgeId;
- }
-
- /**
- * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.stage.Stage}.
+ * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.Stage}.
* @param stageId stage ID in numeric form.
* @return the generated ID
*/
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index e63f1d0..e9bd6eb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -21,7 +21,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.RuntimeEvent;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
/**
* An event for triggering dynamic optimization.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 6fa6bc9..63cd56e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.runtime.common.optimizer.RuntimeOptimizer;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.reef.wake.impl.PubSubEventHandler;
import javax.inject.Inject;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index ed576bd..fe91fd7 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.CompilerEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
/**
* An event for updating the physical plan in the scheduler.
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index 8e00280..ab03331 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import java.util.*;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 7d3e7f4..ed2f92c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -24,9 +24,9 @@ import edu.snu.nemo.common.exception.DynamicOptimizationException;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.data.HashRange;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
import org.slf4j.Logger;
@@ -60,16 +60,16 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
@Override
public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, List<Pair<Integer, Long>>> metricData) {
// Builder to create new stages.
- final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder =
+ final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
new DAGBuilder<>(originalPlan.getStageDAG());
// get edges to optimize
final List<String> optimizationEdgeIds = metricData.keySet().stream().map(blockId ->
RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
- final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = originalPlan.getStageDAG();
- final List<PhysicalStageEdge> optimizationEdges = stageDAG.getVertices().stream()
- .flatMap(physicalStage -> stageDAG.getIncomingEdgesOf(physicalStage).stream())
- .filter(physicalStageEdge -> optimizationEdgeIds.contains(physicalStageEdge.getId()))
+ final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+ final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
+ .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
+ .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
.collect(Collectors.toList());
// Get number of evaluators of the next stage (number of blocks).
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 57c929c..99401fb 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import java.io.Serializable;
import java.util.Set;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
similarity index 89%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
index e7e37ef..1da8f97 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
@@ -29,8 +29,8 @@ public final class ExecutableTask implements Serializable {
private final String jobId;
private final String taskId;
private final int taskIdx;
- private final List<PhysicalStageEdge> taskIncomingEdges;
- private final List<PhysicalStageEdge> taskOutgoingEdges;
+ private final List<StageEdge> taskIncomingEdges;
+ private final List<StageEdge> taskOutgoingEdges;
private final int attemptIdx;
private final String containerType;
private final byte[] serializedTaskDag;
@@ -53,8 +53,8 @@ public final class ExecutableTask implements Serializable {
final int attemptIdx,
final String containerType,
final byte[] serializedIRDag,
- final List<PhysicalStageEdge> taskIncomingEdges,
- final List<PhysicalStageEdge> taskOutgoingEdges,
+ final List<StageEdge> taskIncomingEdges,
+ final List<StageEdge> taskOutgoingEdges,
final Map<String, Readable> irVertexIdToReadable) {
this.jobId = jobId;
this.taskId = taskId;
@@ -98,14 +98,14 @@ public final class ExecutableTask implements Serializable {
/**
* @return the incoming edges of the task.
*/
- public List<PhysicalStageEdge> getTaskIncomingEdges() {
+ public List<StageEdge> getTaskIncomingEdges() {
return taskIncomingEdges;
}
/**
* @return the outgoing edges of the task.
*/
- public List<PhysicalStageEdge> getTaskOutgoingEdges() {
+ public List<StageEdge> getTaskOutgoingEdges() {
return taskOutgoingEdges;
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
similarity index 87%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index b25a0ba..8809bd3 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -26,7 +26,7 @@ import java.util.Map;
*/
public final class PhysicalPlan implements Serializable {
private final String id;
- private final DAG<PhysicalStage, PhysicalStageEdge> stageDAG;
+ private final DAG<Stage, StageEdge> stageDAG;
private final Map<String, IRVertex> idToIRVertex;
/**
@@ -37,7 +37,7 @@ public final class PhysicalPlan implements Serializable {
* @param idToIRVertex map from task to IR vertex.
*/
public PhysicalPlan(final String id,
- final DAG<PhysicalStage, PhysicalStageEdge> stageDAG,
+ final DAG<Stage, StageEdge> stageDAG,
final Map<String, IRVertex> idToIRVertex) {
this.id = id;
this.stageDAG = stageDAG;
@@ -54,7 +54,7 @@ public final class PhysicalPlan implements Serializable {
/**
* @return the DAG of stages.
*/
- public DAG<PhysicalStage, PhysicalStageEdge> getStageDAG() {
+ public DAG<Stage, StageEdge> getStageDAG() {
return stageDAG;
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
similarity index 64%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index 27991e3..adaf752 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
@@ -22,10 +22,7 @@ import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.stage.*;
import edu.snu.nemo.common.exception.IllegalVertexOperationException;
import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
import org.apache.reef.tang.annotations.Parameter;
@@ -37,10 +34,9 @@ import java.util.function.Function;
/**
* A function that converts an IR DAG to physical DAG.
*/
-public final class PhysicalPlanGenerator
- implements Function<DAG<IRVertex, IREdge>, DAG<PhysicalStage, PhysicalStageEdge>> {
-
- final String dagDirectory;
+public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
+ private final Map<String, IRVertex> idToIRVertex;
+ private final String dagDirectory;
/**
* Private constructor.
@@ -49,6 +45,7 @@ public final class PhysicalPlanGenerator
*/
@Inject
private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
+ this.idToIRVertex = new HashMap<>();
this.dagDirectory = dagDirectory;
}
@@ -59,7 +56,7 @@ public final class PhysicalPlanGenerator
* @return {@link PhysicalPlan} to execute.
*/
@Override
- public DAG<PhysicalStage, PhysicalStageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
+ public DAG<Stage, StageEdge> apply(final DAG<IRVertex, IREdge> irDAG) {
// first, stage-partition the IR DAG.
final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG);
@@ -68,9 +65,12 @@ public final class PhysicalPlanGenerator
// for debugging purposes.
dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan");
- // then create tasks and make it into a physical execution plan.
- return stagesIntoPlan(dagOfStages);
+ return dagOfStages;
+ }
+
+ public Map<String, IRVertex> getIdToIRVertex() {
+ return idToIRVertex;
}
/**
@@ -124,24 +124,50 @@ public final class PhysicalPlanGenerator
final Map<IRVertex, Stage> vertexStageMap = new HashMap<>();
for (final List<IRVertex> stageVertices : vertexListForEachStage.values()) {
+ integrityCheck(stageVertices);
+
final Set<IRVertex> currentStageVertices = new HashSet<>();
final Set<StageEdgeBuilder> currentStageIncomingEdges = new HashSet<>();
// Create a new stage builder.
final IRVertex irVertexOfNewStage = stageVertices.stream().findAny()
.orElseThrow(() -> new RuntimeException("Error: List " + stageVertices.getClass() + " is Empty"));
- final StageBuilder stageBuilder =
- new StageBuilder(irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
- irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex));
+ final StageBuilder stageBuilder = new StageBuilder(
+ irVertexOfNewStage.getProperty(ExecutionProperty.Key.StageId),
+ irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism),
+ irVertexOfNewStage.getProperty(ExecutionProperty.Key.ScheduleGroupIndex),
+ irVertexOfNewStage.getProperty(ExecutionProperty.Key.ExecutorPlacement));
+
+ // Prepare useful variables.
+ final int stageParallelism = irVertexOfNewStage.getProperty(ExecutionProperty.Key.Parallelism);
+ final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
+ for (int i = 0; i < stageParallelism; i++) {
+ vertexIdToReadables.add(new HashMap<>());
+ }
// For each vertex in the stage,
for (final IRVertex irVertex : stageVertices) {
+ // Take care of the readables of a source vertex.
+ if (irVertex instanceof SourceVertex) {
+ final SourceVertex sourceVertex = (SourceVertex) irVertex;
+ try {
+ final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
+ for (int i = 0; i < stageParallelism; i++) {
+ vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
+ }
+ } catch (Exception e) {
+ throw new PhysicalPlanGenerationException(e);
+ }
+
+ // Clear internal metadata.
+ sourceVertex.clearInternalStates();
+ }
// Add vertex to the stage.
stageBuilder.addVertex(irVertex);
currentStageVertices.add(irVertex);
- // Connect all the incoming edges for the vertex
+ // Connect all the incoming edges for the vertex.
final List<IREdge> inEdges = irDAG.getIncomingEdgesOf(irVertex);
final Optional<List<IREdge>> inEdgeList = (inEdges == null) ? Optional.empty() : Optional.of(inEdges);
inEdgeList.ifPresent(edges -> edges.forEach(irEdge -> {
@@ -156,7 +182,12 @@ public final class PhysicalPlanGenerator
// both vertices are in the stage.
if (currentStageVertices.contains(srcVertex) && currentStageVertices.contains(dstVertex)) {
- stageBuilder.connectInternalVertices(irEdge);
+ stageBuilder.connectInternalVertices(new RuntimeEdge<IRVertex>(
+ irEdge.getId(),
+ irEdge.getExecutionProperties(),
+ irEdge.getSrc(),
+ irEdge.getDst(),
+ irEdge.getCoder()));
} else { // edge comes from another stage
final Stage srcStage = vertexStageMap.get(srcVertex);
@@ -167,15 +198,19 @@ public final class PhysicalPlanGenerator
final StageEdgeBuilder newEdgeBuilder = new StageEdgeBuilder(irEdge.getId())
.setEdgeProperties(irEdge.getExecutionProperties())
- .setSrcVertex(srcVertex).setDstVertex(dstVertex)
+ .setSrcVertex(srcVertex)
+ .setDstVertex(dstVertex)
.setSrcStage(srcStage)
.setCoder(irEdge.getCoder())
.setSideInputFlag(irEdge.isSideInput());
currentStageIncomingEdges.add(newEdgeBuilder);
}
}));
- }
+ // Track id to irVertex.
+ idToIRVertex.put(irVertex.getId(), irVertex);
+ }
+ stageBuilder.addReadables(vertexIdToReadables);
// If this runtime stage contains at least one vertex, build it!
if (!stageBuilder.isEmpty()) {
final Stage currentStage = stageBuilder.build();
@@ -197,110 +232,32 @@ public final class PhysicalPlanGenerator
return dagOfStagesBuilder.build();
}
-
- private final Map<String, IRVertex> idToIRVertex = new HashMap<>();
-
- /**
- * @return the idToIRVertex map.
- */
- public Map<String, IRVertex> getIdToIRVertex() {
- return idToIRVertex;
- }
-
/**
- * Converts the given DAG of stages to a physical DAG for execution.
- *
- * @param dagOfStages IR DAG partitioned into stages.
- * @return the converted physical DAG to execute,
- * which consists of {@link PhysicalStage} and their relationship represented by {@link PhysicalStageEdge}.
+ * Integrity check for a stage's vertices.
+ * @param stageVertices to check for
*/
- private DAG<PhysicalStage, PhysicalStageEdge> stagesIntoPlan(final DAG<Stage, StageEdge> dagOfStages) {
- final Map<String, PhysicalStage> runtimeStageIdToPhysicalStageMap = new HashMap<>();
- final DAGBuilder<PhysicalStage, PhysicalStageEdge> physicalDAGBuilder = new DAGBuilder<>();
-
- for (final Stage stage : dagOfStages.getVertices()) {
- final List<IRVertex> stageVertices = stage.getStageInternalDAG().getVertices();
-
- final ExecutionPropertyMap firstVertexProperties = stageVertices.iterator().next().getExecutionProperties();
- final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
- final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
-
- // Only one DAG will be created and reused.
- final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
- // Collect split source readables in advance and bind to each irvertex to avoid extra source split.
- final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
- for (int i = 0; i < stageParallelism; i++) {
- vertexIdToReadables.add(new HashMap<>());
+ private void integrityCheck(final List<IRVertex> stageVertices) {
+ final IRVertex firstVertex = stageVertices.get(0);
+ final String placement = firstVertex.getProperty(ExecutionProperty.Key.ExecutorPlacement);
+ final int scheduleGroup = firstVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
+ final int parallelism = firstVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism);
+
+ stageVertices.forEach(irVertex -> {
+ // Check vertex type.
+ if (!(irVertex instanceof SourceVertex
+ || irVertex instanceof OperatorVertex
+ || irVertex instanceof MetricCollectionBarrierVertex)) {
+ throw new UnsupportedOperationException(irVertex.toString());
}
- // Iterate over the vertices contained in this stage
- stageVertices.forEach(irVertex -> {
- if (!(irVertex instanceof SourceVertex
- || irVertex instanceof OperatorVertex
- || irVertex instanceof MetricCollectionBarrierVertex)) {
- // Sanity check
- throw new IllegalStateException(irVertex.toString());
- }
-
- if (irVertex instanceof SourceVertex) {
- final SourceVertex sourceVertex = (SourceVertex) irVertex;
-
- // Take care of the Readables
- try {
- final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
- for (int i = 0; i < stageParallelism; i++) {
- vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i));
- }
- } catch (Exception e) {
- throw new PhysicalPlanGenerationException(e);
- }
-
- // Clear internal metadata
- sourceVertex.clearInternalStates();
- }
-
- stageInternalDAGBuilder.addVertex(irVertex);
- idToIRVertex.put(irVertex.getId(), irVertex);
- });
-
- // connect internal edges in the stage. It suffices to iterate over only the stage internal inEdges.
- final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
- stageInternalDAG.getVertices().forEach(irVertex -> {
- final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
- inEdges.forEach(edge ->
- stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>(
- edge.getId(),
- edge.getExecutionProperties(),
- edge.getSrc(),
- edge.getDst(),
- edge.getCoder(),
- edge.isSideInput())));
- });
-
- // Create the physical stage.
- final PhysicalStage physicalStage =
- new PhysicalStage(stage.getId(), stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
- stageParallelism, stage.getScheduleGroupIndex(), containerType, vertexIdToReadables);
-
- physicalDAGBuilder.addVertex(physicalStage);
- runtimeStageIdToPhysicalStageMap.put(stage.getId(), physicalStage);
- }
-
- // Connect Physical stages
- dagOfStages.getVertices().forEach(stage ->
- dagOfStages.getIncomingEdgesOf(stage).forEach(stageEdge -> {
- final PhysicalStage srcStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getSrc().getId());
- final PhysicalStage dstStage = runtimeStageIdToPhysicalStageMap.get(stageEdge.getDst().getId());
-
- physicalDAGBuilder.connectVertices(new PhysicalStageEdge(stageEdge.getId(),
- stageEdge.getExecutionProperties(),
- stageEdge.getSrcVertex(),
- stageEdge.getDstVertex(),
- srcStage, dstStage,
- stageEdge.getCoder(),
- stageEdge.isSideInput()));
- }));
-
- return physicalDAGBuilder.build();
+ // Check execution properties.
+ if ((placement != null
+ && !placement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
+ || scheduleGroup != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)
+ || parallelism != irVertex.<Integer>getProperty(ExecutionProperty.Key.Parallelism)) {
+ throw new RuntimeException("Vertices of the same stage have different execution properties: "
+ + irVertex.getId());
+ }
+ });
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
similarity index 86%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
index a58cfba..138dbb9 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java
@@ -13,14 +13,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.dag.Vertex;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.commons.lang3.SerializationUtils;
import java.util.ArrayList;
@@ -28,9 +27,9 @@ import java.util.List;
import java.util.Map;
/**
- * PhysicalStage.
+ * Stage.
*/
-public final class PhysicalStage extends Vertex {
+public final class Stage extends Vertex {
private final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag;
private final int parallelism;
private final int scheduleGroupIndex;
@@ -48,12 +47,12 @@ public final class PhysicalStage extends Vertex {
* @param containerType the type of container to execute the task on.
* @param vertexIdToReadables the list of maps between vertex ID and {@link Readable}.
*/
- public PhysicalStage(final String stageId,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
- final int parallelism,
- final int scheduleGroupIndex,
- final String containerType,
- final List<Map<String, Readable>> vertexIdToReadables) {
+ public Stage(final String stageId,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag,
+ final int parallelism,
+ final int scheduleGroupIndex,
+ final String containerType,
+ final List<Map<String, Readable>> vertexIdToReadables) {
super(stageId);
this.irDag = irDag;
this.parallelism = parallelism;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
similarity index 52%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
index 3566948..9913fa9 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageBuilder.java
@@ -13,93 +13,88 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
-import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.common.dag.DAGBuilder;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* Stage Builder.
*/
-public final class StageBuilder {
- private final DAGBuilder<IRVertex, IREdge> stageInternalDAGBuilder;
+final class StageBuilder {
+ private final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder;
private final Integer stageId;
+ private final int parallelism;
private final int scheduleGroupIndex;
+ private final String containerType;
+ private List<Map<String, Readable>> vertexIdToReadables;
/**
* Builds a {@link Stage}.
* @param stageId stage ID in numeric form.
* @param scheduleGroupIndex indicating its scheduling order.
*/
- public StageBuilder(final Integer stageId,
- final int scheduleGroupIndex) {
+ StageBuilder(final Integer stageId,
+ final int parallelism,
+ final int scheduleGroupIndex,
+ final String containerType) {
this.stageId = stageId;
+ this.parallelism = parallelism;
this.scheduleGroupIndex = scheduleGroupIndex;
+ this.containerType = containerType;
this.stageInternalDAGBuilder = new DAGBuilder<>();
+ this.vertexIdToReadables = new ArrayList<>(1);
}
/**
- */
- /**
* Adds a {@link IRVertex} to this stage.
* @param vertex to add.
* @return the stageBuilder.
*/
- public StageBuilder addVertex(final IRVertex vertex) {
+ StageBuilder addVertex(final IRVertex vertex) {
stageInternalDAGBuilder.addVertex(vertex);
- return this;
- }
+ return this; }
/**
* Connects two {@link IRVertex} in this stage.
* @param edge the IREdge that connects vertices.
* @return the stageBuilder.
*/
- public StageBuilder connectInternalVertices(final IREdge edge) {
+ StageBuilder connectInternalVertices(final RuntimeEdge<IRVertex> edge) {
stageInternalDAGBuilder.connectVertices(edge);
return this;
}
- /**
- * @return true if this builder doesn't contain any valid {@link IRVertex}.
- */
- public boolean isEmpty() {
- return stageInternalDAGBuilder.isEmpty();
+ StageBuilder addReadables(final List<Map<String, Readable>> vertexIdToReadable) {
+ this.vertexIdToReadables = vertexIdToReadable;
+ return this;
}
/**
- * Integrity check for stages.
- * @param stage stage to check for.
+ * @return true if this builder doesn't contain any valid {@link IRVertex}.
*/
- private void integrityCheck(final Stage stage) {
- final List<IRVertex> vertices = stage.getStageInternalDAG().getVertices();
-
- final String firstPlacement = vertices.iterator().next().getProperty(ExecutionProperty.Key.ExecutorPlacement);
- final int scheduleGroupIdx =
- vertices.iterator().next().<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex);
- vertices.forEach(irVertex -> {
- if ((firstPlacement != null
- && !firstPlacement.equals(irVertex.<String>getProperty(ExecutionProperty.Key.ExecutorPlacement)))
- || scheduleGroupIdx != irVertex.<Integer>getProperty(ExecutionProperty.Key.ScheduleGroupIndex)) {
- throw new RuntimeException("Vertices of the same stage have different execution properties: "
- + irVertex.getId());
- }
- });
+ boolean isEmpty() {
+ return stageInternalDAGBuilder.isEmpty();
}
/**
* Builds and returns the {@link Stage}.
* @return the runtime stage.
*/
- public Stage build() {
- final Stage stage = new Stage(RuntimeIdGenerator.generateStageId(stageId),
- stageInternalDAGBuilder.buildWithoutSourceSinkCheck(), scheduleGroupIndex);
- integrityCheck(stage);
+ Stage build() {
+ final Stage stage = new Stage(
+ RuntimeIdGenerator.generateStageId(stageId),
+ stageInternalDAGBuilder.buildWithoutSourceSinkCheck(),
+ parallelism,
+ scheduleGroupIndex,
+ containerType,
+ vertexIdToReadables);
return stage;
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
similarity index 79%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 13c83e8..3b8cdf0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -13,22 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.physical;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.coder.Coder;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.data.HashRange;
import java.util.ArrayList;
import java.util.List;
/**
- * Contains information stage boundary {@link edu.snu.nemo.runtime.common.plan.stage.StageEdge}.
+ * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
+ * This means that there can be multiple StageEdges between two Stages.
*/
-public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
+public final class StageEdge extends RuntimeEdge<Stage> {
/**
* The source {@link IRVertex}.
* This belongs to the srcStage.
@@ -50,21 +50,21 @@ public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
* Constructor.
* @param runtimeEdgeId id of the runtime edge.
* @param edgeProperties edge execution properties.
- * @param srcVertex source vertex.
- * @param dstVertex destination vertex.
+ * @param srcVertex source IRVertex in the srcStage of this edge.
+ * @param dstVertex destination IRVertex in the dstStage of this edge.
* @param srcStage source stage.
* @param dstStage destination stage.
* @param coder the coder for enconding and deconding.
* @param isSideInput whether or not the edge is a sideInput edge.
*/
- public PhysicalStageEdge(final String runtimeEdgeId,
- final ExecutionPropertyMap edgeProperties,
- final IRVertex srcVertex,
- final IRVertex dstVertex,
- final PhysicalStage srcStage,
- final PhysicalStage dstStage,
- final Coder coder,
- final Boolean isSideInput) {
+ public StageEdge(final String runtimeEdgeId,
+ final ExecutionPropertyMap edgeProperties,
+ final IRVertex srcVertex,
+ final IRVertex dstVertex,
+ final Stage srcStage,
+ final Stage dstStage,
+ final Coder coder,
+ final Boolean isSideInput) {
super(runtimeEdgeId, edgeProperties, srcStage, dstStage, coder, isSideInput);
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
similarity index 92%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
index cf5d3cc..b7448a1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdgeBuilder.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdgeBuilder.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.plan.stage;
+package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.coder.Coder;
@@ -37,7 +37,7 @@ public final class StageEdgeBuilder {
* Represents the edge between vertices in a logical plan.
* @param irEdgeId id of this edge.
*/
- public StageEdgeBuilder(final String irEdgeId) {
+ StageEdgeBuilder(final String irEdgeId) {
this.stageEdgeId = irEdgeId;
}
@@ -112,7 +112,6 @@ public final class StageEdgeBuilder {
}
public StageEdge build() {
- return new StageEdge(stageEdgeId,
- edgeProperties, srcStage, dstStage, coder, isSideInput, srcVertex, dstVertex);
+ return new StageEdge(stageEdgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage, coder, isSideInput);
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
deleted file mode 100644
index f56bdef..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/Stage.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.Vertex;
-
-/**
- * Represents a stage in Runtime's execution of a job.
- * Each stage contains a part of a whole execution plan.
- * Stage partitioning is determined by {edu.snu.nemo.compiler.backend.nemo.NemoBackend}.
- */
-public final class Stage extends Vertex {
- private final DAG<IRVertex, IREdge> stageInternalDAG;
- private final int scheduleGroupIndex;
-
- /**
- * Constructor.
- * @param stageId id of the stage.
- * @param stageInternalDAG the internal DAG of the stage.
- * @param scheduleGroupIndex the schedule group index.
- */
- public Stage(final String stageId,
- final DAG<IRVertex, IREdge> stageInternalDAG,
- final int scheduleGroupIndex) {
- super(stageId);
- this.stageInternalDAG = stageInternalDAG;
- this.scheduleGroupIndex = scheduleGroupIndex;
- }
-
- /**
- * @return the internal DAG of the stage.
- */
- public DAG<IRVertex, IREdge> getStageInternalDAG() {
- return stageInternalDAG;
- }
-
- /**
- * @return the schedule group index.
- */
- public int getScheduleGroupIndex() {
- return scheduleGroupIndex;
- }
-
- @Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
- sb.append(", \"stageInternalDAG\": ").append(stageInternalDAG.toString());
- sb.append("}");
- return sb.toString();
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
deleted file mode 100644
index 5d91959..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/stage/StageEdge.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.stage;
-
-
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-
-/**
- * Stage Edge.
- */
-public final class StageEdge extends RuntimeEdge<Stage> {
- private final IRVertex srcVertex;
- private final IRVertex dstVertex;
-
- /**
- * Represents the edge between stages.
- * @param irEdgeId id of this edge.
- * @param edgeProperties to control the data flow on this edge.
- * @param srcStage source runtime stage.
- * @param dstStage destination runtime stage.
- * @param coder coder.
- * @param isSideInput flag for whether or not the edge is a sideInput.
- * @param srcVertex source vertex (in srcStage).
- * @param dstVertex destination vertex (in dstStage).
- */
- public StageEdge(final String irEdgeId,
- final ExecutionPropertyMap edgeProperties,
- final Stage srcStage,
- final Stage dstStage,
- final Coder coder,
- final Boolean isSideInput,
- final IRVertex srcVertex,
- final IRVertex dstVertex) {
- super(RuntimeIdGenerator.generateStageEdgeId(irEdgeId), edgeProperties, srcStage, dstStage, coder, isSideInput);
- this.srcVertex = srcVertex;
- this.dstVertex = dstVertex;
- }
-
- /**
- * @return the source vertex.
- */
- public IRVertex getSrcVertex() {
- return srcVertex;
- }
-
- /**
- * @return the destination vertex.
- */
- public IRVertex getDstVertex() {
- return dstVertex;
- }
-
- @Override
- public String propertiesToJSON() {
- final StringBuilder sb = new StringBuilder();
- sb.append("{\"runtimeEdgeId\": \"").append(getId());
- sb.append("\", \"edgeProperties\": ").append(getExecutionProperties());
- sb.append(", \"srcVertex\": \"").append(srcVertex.getId());
- sb.append("\", \"dstVertex\": \"").append(dstVertex.getId());
- sb.append("\", \"coder\": \"").append(getCoder().toString());
- sb.append("\"}");
- return sb.toString();
- }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
index 81286c1..41a2d0e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.state;
import edu.snu.nemo.common.StateMachine;
/**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan}.
+ * Represents the states and their transitions of a physical plan.
*/
public final class JobState {
private final StateMachine stateMachine;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index 6e2a3d5..114275e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.common.state;
import edu.snu.nemo.common.StateMachine;
/**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.PhysicalStage}.
+ * Represents the states and their transitions of a stage.
*/
public final class StageState {
private final StateMachine stateMachine;
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 78896df..a5b6830 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.compiler.backend.nemo.NemoBackend;
import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
import edu.snu.nemo.compiler.optimizer.policy.Policy;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.RuntimeMaster;
import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 7a7b523..8872990 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageListener;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.executor.data.SerializerManager;
import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
import org.apache.commons.lang3.SerializationUtils;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 2349ab7..09a43cc 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,8 +23,9 @@ import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.datatransfer.*;
@@ -50,8 +51,8 @@ public final class TaskExecutor {
private final String taskId;
private final int taskIdx;
private final TaskStateManager taskStateManager;
- private final List<PhysicalStageEdge> stageIncomingEdges;
- private final List<PhysicalStageEdge> stageOutgoingEdges;
+ private final List<StageEdge> stageIncomingEdges;
+ private final List<StageEdge> stageOutgoingEdges;
private Map<String, Readable> irVertexIdToReadable;
// Other parameters
@@ -140,8 +141,8 @@ public final class TaskExecutor {
// 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
// to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
irVertexDag.topologicalDo(irVertex -> {
- final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
- final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
+ final Set<StageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(irVertex);
+ final Set<StageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(irVertex);
final IRVertexDataHandler dataHandler = getIRVertexDataHandler(irVertex);
// Set data handlers of children irVertices.
@@ -152,9 +153,9 @@ public final class TaskExecutor {
dataHandler.setChildrenDataHandler(childrenDataHandlers);
// Add InputReaders for inter-stage data transfer
- inEdgesFromOtherStages.forEach(physicalStageEdge -> {
+ inEdgesFromOtherStages.forEach(stageEdge -> {
final InputReader inputReader = channelFactory.createReader(
- taskIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
+ taskIdx, stageEdge.getSrcVertex(), stageEdge);
// For InputReaders that have side input, collect them separately.
if (inputReader.isSideInputReader()) {
@@ -166,9 +167,9 @@ public final class TaskExecutor {
});
// Add OutputWriters for inter-stage data transfer
- outEdgesToOtherStages.forEach(physicalStageEdge -> {
+ outEdgesToOtherStages.forEach(stageEdge -> {
final OutputWriter outputWriter = channelFactory.createWriter(
- irVertex, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+ irVertex, taskIdx, stageEdge.getDstVertex(), stageEdge);
dataHandler.addOutputWriter(outputWriter);
});
@@ -206,7 +207,7 @@ public final class TaskExecutor {
* @param irVertex the IRVertex whose inter-stage incoming edges to be collected.
* @return the collected incoming edges.
*/
- private Set<PhysicalStageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
+ private Set<StageEdge> getInEdgesFromOtherStages(final IRVertex irVertex) {
return stageIncomingEdges.stream().filter(
stageInEdge -> stageInEdge.getDstVertex().getId().equals(irVertex.getId()))
.collect(Collectors.toSet());
@@ -218,7 +219,7 @@ public final class TaskExecutor {
* @param irVertex the IRVertex whose inter-stage outgoing edges to be collected.
* @return the collected outgoing edges.
*/
- private Set<PhysicalStageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
+ private Set<StageEdge> getOutEdgesToOtherStages(final IRVertex irVertex) {
return stageOutgoingEdges.stream().filter(
stageInEdge -> stageInEdge.getSrcVertex().getId().equals(irVertex.getId()))
.collect(Collectors.toSet());
@@ -431,8 +432,8 @@ public final class TaskExecutor {
final Object sideInput = getSideInput(sideInputIterator);
final RuntimeEdge inEdge = sideInputReader.getRuntimeEdge();
final Transform srcTransform;
- if (inEdge instanceof PhysicalStageEdge) {
- srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+ if (inEdge instanceof StageEdge) {
+ srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
} else {
srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
}
@@ -473,8 +474,8 @@ public final class TaskExecutor {
Object sideInput = input.remove();
final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
final Transform srcTransform;
- if (inEdge instanceof PhysicalStageEdge) {
- srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
+ if (inEdge instanceof StageEdge) {
+ srcTransform = ((OperatorVertex) ((StageEdge) inEdge).getSrcVertex()).getTransform();
} else {
srcTransform = ((OperatorVertex) inEdge.getSrc()).getTransform();
}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index e2a31d6..7b1bc52 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import java.util.*;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 3ca2402..a01e58c 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.data.KeyRange;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
import edu.snu.nemo.runtime.common.data.HashRange;
@@ -113,9 +113,9 @@ public final class InputReader extends DataTransfer {
* @return the list of the completable future of the data.
*/
private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
- assert (runtimeEdge instanceof PhysicalStageEdge);
+ assert (runtimeEdge instanceof StageEdge);
final KeyRange hashRangeToRead =
- ((PhysicalStageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+ ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
if (hashRangeToRead == null) {
throw new BlockFetchException(
new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 49401aa..b758fa8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -23,8 +23,10 @@ import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.common.state.StageState;
@@ -136,10 +138,10 @@ public final class JobStateManager {
onJobStateChanged(JobState.State.EXECUTING);
// Initialize the states for the job down to task-level.
- physicalPlan.getStageDAG().topologicalDo(physicalStage -> {
- currentJobStageIds.add(physicalStage.getId());
- idToStageStates.put(physicalStage.getId(), new StageState());
- physicalStage.getTaskIds().forEach(taskId -> {
+ physicalPlan.getStageDAG().topologicalDo(stage -> {
+ currentJobStageIds.add(stage.getId());
+ idToStageStates.put(stage.getId(), new StageState());
+ stage.getTaskIds().forEach(taskId -> {
idToTaskStates.put(taskId, new TaskState());
taskIdToCurrentAttempt.put(taskId, 1);
});
@@ -147,23 +149,23 @@ public final class JobStateManager {
}
private void initializePartitionStates(final BlockManagerMaster blockManagerMaster) {
- final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = physicalPlan.getStageDAG();
- stageDAG.topologicalDo(physicalStage -> {
- final List<String> taskIdsForStage = physicalStage.getTaskIds();
- final List<PhysicalStageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(physicalStage);
+ final DAG<Stage, StageEdge> stageDAG = physicalPlan.getStageDAG();
+ stageDAG.topologicalDo(stage -> {
+ final List<String> taskIdsForStage = stage.getTaskIds();
+ final List<StageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(stage);
// Initialize states for blocks of inter-stage edges
- stageOutgoingEdges.forEach(physicalStageEdge -> {
+ stageOutgoingEdges.forEach(stageEdge -> {
final int srcParallelism = taskIdsForStage.size();
IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
- final String blockId = RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(), srcTaskIdx);
+ final String blockId = RuntimeIdGenerator.generateBlockId(stageEdge.getId(), srcTaskIdx);
blockManagerMaster.initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
});
});
// Initialize states for blocks of stage internal edges
taskIdsForStage.forEach(taskId -> {
- final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = physicalStage.getIRDAG();
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskInternalDag = stage.getIRDAG();
taskInternalDag.getVertices().forEach(task -> {
final List<RuntimeEdge<IRVertex>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
internalOutgoingEdges.forEach(taskRuntimeEdge -> {
@@ -240,7 +242,7 @@ public final class JobStateManager {
// if there exists a mapping, this state change is from a failed_recoverable stage,
// and there may be tasks that do not need to be re-executed.
if (!stageIdToRemainingTaskSet.containsKey(stageId)) {
- for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+ for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
if (stage.getId().equals(stageId)) {
Set<String> remainingTaskIds = new HashSet<>();
remainingTaskIds.addAll(
@@ -510,9 +512,9 @@ public final class JobStateManager {
public synchronized String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("\"jobId\": \"").append(jobId).append("\", ");
- sb.append("\"physicalStages\": [");
+ sb.append("\"stages\": [");
boolean isFirstStage = true;
- for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
+ for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
if (!isFirstStage) {
sb.append(", ");
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index d2b709f..4751573 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -24,7 +24,7 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageContext;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageListener;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.resource.ContainerManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 3035660..2ddb818 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -21,7 +21,7 @@ package edu.snu.nemo.runtime.master.eventhandler;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.CompilerEventHandler;
import edu.snu.nemo.runtime.common.eventhandler.UpdatePhysicalPlanEvent;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.master.scheduler.Scheduler;
import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index 8536746..f73ecdb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.driver.context.ActiveContext;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index f32a441..c51d1ec 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -22,12 +22,11 @@ import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.plan.*;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import edu.snu.nemo.common.exception.*;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.runtime.common.plan.physical.*;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +111,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
LOG.info("Job to schedule: {}", jobToSchedule.getId());
this.initialScheduleGroup = jobToSchedule.getStageDAG().getVertices().stream()
- .mapToInt(physicalStage -> physicalStage.getScheduleGroupIndex())
+ .mapToInt(stage -> stage.getScheduleGroupIndex())
.min().getAsInt();
scheduleRootStages();
@@ -226,9 +225,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
* Schedule stages in initial schedule group, in reverse-topological order.
*/
private void scheduleRootStages() {
- final List<PhysicalStage> rootStages =
- physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
- physicalStage.getScheduleGroupIndex() == initialScheduleGroup)
+ final List<Stage> rootStages =
+ physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+ stage.getScheduleGroupIndex() == initialScheduleGroup)
.collect(Collectors.toList());
Collections.reverse(rootStages);
rootStages.forEach(this::scheduleStage);
@@ -239,8 +238,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
* @param completedStageId the ID of the stage that just completed and triggered this scheduling.
*/
private void scheduleNextStage(final String completedStageId) {
- final PhysicalStage completeOrFailedStage = getStageById(completedStageId);
- final Optional<List<PhysicalStage>> nextStagesToSchedule =
+ final Stage completeOrFailedStage = getStageById(completedStageId);
+ final Optional<List<Stage>> nextStagesToSchedule =
selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
if (nextStagesToSchedule.isPresent()) {
@@ -271,9 +270,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
* @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
* enqueued to {@link PendingTaskCollection}.
*/
- private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
+ private Optional<List<Stage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
if (currentScheduleGroupIndex > initialScheduleGroup) {
- final Optional<List<PhysicalStage>> ancestorStagesFromAScheduleGroup =
+ final Optional<List<Stage>> ancestorStagesFromAScheduleGroup =
selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
if (ancestorStagesFromAScheduleGroup.isPresent()) {
return ancestorStagesFromAScheduleGroup;
@@ -281,15 +280,15 @@ public final class BatchSingleJobScheduler implements Scheduler {
}
// All previous schedule groups are complete, we need to check for the current schedule group.
- final List<PhysicalStage> currentScheduleGroup =
- physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
- physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex)
+ final List<Stage> currentScheduleGroup =
+ physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage ->
+ stage.getScheduleGroupIndex() == currentScheduleGroupIndex)
.collect(Collectors.toList());
- List<PhysicalStage> stagesToSchedule = new LinkedList<>();
+ List<Stage> stagesToSchedule = new LinkedList<>();
boolean allStagesComplete = true;
// We need to reschedule failed_recoverable stages.
- for (final PhysicalStage stageToCheck : currentScheduleGroup) {
+ for (final Stage stageToCheck : currentScheduleGroup) {
final StageState.State stageState =
(StageState.State) jobStateManager.getStageState(stageToCheck.getId()).getStateMachine().getCurrentState();
switch (stageState) {
@@ -313,9 +312,9 @@ public final class BatchSingleJobScheduler implements Scheduler {
// By the time the control flow has reached here,
// we are ready to move onto the next ScheduleGroup
stagesToSchedule =
- physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage -> {
- if (physicalStage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
- final String stageId = physicalStage.getId();
+ physicalPlan.getStageDAG().getTopologicalSort().stream().filter(stage -> {
+ if (stage.getScheduleGroupIndex() == currentScheduleGroupIndex + 1) {
+ final String stageId = stage.getId();
return jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
!= StageState.State.EXECUTING
&& jobStateManager.getStageState(stageId).getStateMachine().getCurrentState()
@@ -341,10 +340,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
* It adds the list of tasks for the stage where the scheduler thread continuously polls from.
* @param stageToSchedule the stage to schedule.
*/
- private void scheduleStage(final PhysicalStage stageToSchedule) {
- final List<PhysicalStageEdge> stageIncomingEdges =
+ private void scheduleStage(final Stage stageToSchedule) {
+ final List<StageEdge> stageIncomingEdges =
physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
- final List<PhysicalStageEdge> stageOutgoingEdges =
+ final List<StageEdge> stageOutgoingEdges =
physicalPlan.getStageDAG().getOutgoingEdgesOf(stageToSchedule.getId());
final Enum stageState = jobStateManager.getStageState(stageToSchedule.getId()).getStateMachine().getCurrentState();
@@ -419,18 +418,18 @@ public final class BatchSingleJobScheduler implements Scheduler {
* @return the IR dag
*/
private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
- for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
- if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
- return physicalStage.getIRDAG();
+ for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+ if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+ return stage.getIRDAG();
}
}
throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
}
- private PhysicalStage getStageById(final String stageId) {
- for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
- if (physicalStage.getId().equals(stageId)) {
- return physicalStage;
+ private Stage getStageById(final String stageId) {
+ for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
+ if (stage.getId().equals(stageId)) {
+ return stage;
}
}
throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
@@ -538,7 +537,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
case INPUT_READ_FAILURE:
jobStateManager.onTaskStateChanged(taskId, newState);
LOG.info("All tasks of {} will be made failed_recoverable.", stageId);
- for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
+ for (final Stage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
if (stage.getId().equals(stageId)) {
LOG.info("Removing Tasks for {} before they are scheduled to an executor", stage.getId());
pendingTaskCollection.removeTasksAndDescendants(stage.getId());
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index ab5081c..66fa986 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index f23a1f0..46b3c6b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index e72f486..5d19f52 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 361307f..1d8c8a2 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -15,8 +15,8 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index a232a79..afdc671 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index d34eb56..6e31269 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index f411c1d..aa53780 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 1e71882..7ea05fb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index 0375e01..fae4565 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -17,10 +17,10 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
@@ -147,7 +147,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
}
physicalPlan.getStageDAG().getChildren(stageId).forEach(
- physicalStage -> removeStageAndChildren(physicalStage.getId()));
+ stage -> removeStageAndChildren(stage.getId()));
}
/**
@@ -161,7 +161,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
*/
private synchronized void updateSchedulableStages(
final String candidateStageId, final String candidateStageContainerType) {
- final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
+ final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
if (isSchedulable(candidateStageId, candidateStageContainerType)) {
// Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
@@ -187,8 +187,8 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* @return true if schedulable, false otherwise.
*/
private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
- final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
- for (final PhysicalStage descendantStage : jobDAG.getDescendants(candidateStageId)) {
+ final DAG<Stage, StageEdge> jobDAG = physicalPlan.getStageDAG();
+ for (final Stage descendantStage : jobDAG.getDescendants(candidateStageId)) {
if (schedulableStages.contains(descendantStage.getId())) {
if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index f7056bf..bee0a36 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index af8371b..ee2cfa0 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -82,7 +82,7 @@ public final class BlockManagerMasterTest {
*/
@Test
public void testLostAfterCommit() throws Exception {
- final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-0");
+ final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-0");
final int srcTaskIndex = 0;
final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-test");
final String executorId = RuntimeIdGenerator.generateExecutorId();
@@ -115,7 +115,7 @@ public final class BlockManagerMasterTest {
*/
@Test
public void testBeforeAfterCommit() throws Exception {
- final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-1");
+ final String edgeId = RuntimeIdGenerator.generateStageEdgeId("Edge-1");
final int srcTaskIndex = 0;
final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-Test");
final String executorId = RuntimeIdGenerator.generateExecutorId();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index 2f5473b..5d515a7 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -22,7 +22,10 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskState;
@@ -85,17 +88,17 @@ public final class JobStateManagerTest {
assertEquals(jobStateManager.getJobId(), "TestPlan");
- final List<PhysicalStage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
+ final List<Stage> stageList = physicalPlan.getStageDAG().getTopologicalSort();
for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
- final PhysicalStage physicalStage = stageList.get(stageIdx);
- jobStateManager.onStageStateChanged(physicalStage.getId(), StageState.State.EXECUTING);
- final List<String> taskIds = physicalStage.getTaskIds();
+ final Stage stage = stageList.get(stageIdx);
+ jobStateManager.onStageStateChanged(stage.getId(), StageState.State.EXECUTING);
+ final List<String> taskIds = stage.getTaskIds();
taskIds.forEach(taskId -> {
jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 1) {
- assertTrue(jobStateManager.checkStageCompletion(physicalStage.getId()));
+ assertTrue(jobStateManager.checkStageCompletion(stage.getId()));
}
});
final Map<String, TaskState> taskStateMap = jobStateManager.getIdToTaskStates();
@@ -117,7 +120,7 @@ public final class JobStateManagerTest {
public void testWaitUntilFinish() {
// Create a JobStateManager of an empty dag.
final DAG<IRVertex, IREdge> irDAG = irDAGBuilder.build();
- final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+ final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
final JobStateManager jobStateManager = new JobStateManager(
new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()),
blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 78466b2..308f3b5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -20,7 +20,9 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -152,19 +154,19 @@ public final class BatchSingleJobSchedulerTest {
// b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
final int scheduleGroupIdx = i;
- final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
+ final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
- stages.forEach(physicalStage -> {
- while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
+ stages.forEach(stage -> {
+ while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState()
!= StageState.State.EXECUTING) {
}
});
- stages.forEach(physicalStage -> {
+ stages.forEach(stage -> {
SchedulerTestUtil.completeStage(
- jobStateManager, scheduler, executorRegistry, physicalStage, SCHEDULE_ATTEMPT_INDEX);
+ jobStateManager, scheduler, executorRegistry, stage, SCHEDULE_ATTEMPT_INDEX);
});
}
@@ -174,13 +176,13 @@ public final class BatchSingleJobSchedulerTest {
assertTrue(jobStateManager.checkJobTermination());
}
- private List<PhysicalStage> filterStagesWithAScheduleGroupIndex(
- final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG, final int scheduleGroupIndex) {
- final Set<PhysicalStage> stageSet = new HashSet<>(physicalDAG.filterVertices(
- physicalStage -> physicalStage.getScheduleGroupIndex() == scheduleGroupIndex));
+ private List<Stage> filterStagesWithAScheduleGroupIndex(
+ final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) {
+ final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices(
+ stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex));
// Return the filtered vertices as a sorted list
- final List<PhysicalStage> sortedStages = new ArrayList<>(stageSet.size());
+ final List<Stage> sortedStages = new ArrayList<>(stageSet.size());
physicalDAG.topologicalDo(stage -> {
if (stageSet.contains(stage)) {
sortedStages.add(stage);
@@ -189,7 +191,7 @@ public final class BatchSingleJobSchedulerTest {
return sortedStages;
}
- private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG) {
+ private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) {
final Set<Integer> scheduleGroupSet = new HashSet<>();
physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex()));
return scheduleGroupSet.size();
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 3150475..1b7768f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
index 5b48315..f4ac23e 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
@@ -19,7 +19,8 @@ import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
@@ -132,9 +133,9 @@ public final class FaultToleranceTest {
new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
scheduler.scheduleJob(plan, jobStateManager);
- final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
- for (final PhysicalStage stage : dagOf4Stages) {
+ for (final Stage stage : dagOf4Stages) {
if (stage.getScheduleGroupIndex() == 0) {
// There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -203,9 +204,9 @@ public final class FaultToleranceTest {
new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
scheduler.scheduleJob(plan, jobStateManager);
- final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
- for (final PhysicalStage stage : dagOf4Stages) {
+ for (final Stage stage : dagOf4Stages) {
if (stage.getScheduleGroupIndex() == 0) {
// There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -269,9 +270,9 @@ public final class FaultToleranceTest {
new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
scheduler.scheduleJob(plan, jobStateManager);
- final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
- for (final PhysicalStage stage : dagOf4Stages) {
+ for (final Stage stage : dagOf4Stages) {
if (stage.getScheduleGroupIndex() == 0) {
// There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
@@ -324,13 +325,13 @@ public final class FaultToleranceTest {
scheduler.scheduleJob(plan, jobStateManager);
final List<ExecutorRepresenter> executors = new ArrayList<>();
- final List<PhysicalStage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort();
int executorIdIndex = 1;
float removalChance = 0.7f; // Out of 1.0
final Random random = new Random(0); // Deterministic seed.
- for (final PhysicalStage stage : dagOf4Stages) {
+ for (final Stage stage : dagOf4Stages) {
while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != COMPLETE) {
// By chance, remove or add executor
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index 0071b9f..ac77c46 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 5edd885..d40859b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
index a9d1d4b..644fb38 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
@@ -32,21 +32,21 @@ final class SchedulerTestUtil {
* @param jobStateManager for the submitted job.
* @param scheduler for the submitted job.
* @param executorRegistry provides executor representers
- * @param physicalStage for which the states should be marked as complete.
+ * @param stage for which the states should be marked as complete.
*/
static void completeStage(final JobStateManager jobStateManager,
final Scheduler scheduler,
final ExecutorRegistry executorRegistry,
- final PhysicalStage physicalStage,
+ final Stage stage,
final int attemptIdx) {
// Loop until the stage completes.
while (true) {
- final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+ final Enum stageState = jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState();
if (StageState.State.COMPLETE == stageState) {
// Stage has completed, so we break out of the loop.
break;
} else if (StageState.State.EXECUTING == stageState) {
- physicalStage.getTaskIds().forEach(taskId -> {
+ stage.getTaskIds().forEach(taskId -> {
final Enum tgState = jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState();
if (TaskState.State.EXECUTING == tgState) {
sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 0ae5998..921dfe8 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,9 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.*;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
import org.junit.Before;
import org.junit.Test;
@@ -59,7 +61,7 @@ public final class SingleTaskQueueTest {
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true);
pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
- final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
// Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -115,7 +117,7 @@ public final class SingleTaskQueueTest {
final PhysicalPlan physicalPlan =
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false);
pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
- final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
// Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
@@ -168,7 +170,7 @@ public final class SingleTaskQueueTest {
final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true);
pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
- final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
+ final List<Stage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
// Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -206,10 +208,10 @@ public final class SingleTaskQueueTest {
}
/**
- * Schedule the tasks in a physical stage.
+ * Schedule the tasks in a stage.
* @param stage the stage to schedule.
*/
- private void scheduleStage(final PhysicalStage stage) {
+ private void scheduleStage(final Stage stage) {
stage.getTaskIds().forEach(taskId ->
pendingTaskPriorityQueue.add(new ExecutableTask(
"TestPlan",
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 65f9d98..4263e7f 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.physical.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index f9d20bd..9d4142d 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -31,10 +31,10 @@ import edu.snu.nemo.compiler.optimizer.policy.BasicPullPolicy;
import edu.snu.nemo.compiler.optimizer.policy.BasicPushPolicy;
import edu.snu.nemo.compiler.optimizer.policy.Policy;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
@@ -100,7 +100,7 @@ public final class TestPlanGenerator {
private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDAG,
final Policy policy) throws Exception {
final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
- final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
+ final DAG<Stage, StageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR);
return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex());
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
index f257926..fe69942 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/client/ClientEndpointTest.java
@@ -26,10 +26,10 @@ import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.master.MetricMessageHandler;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -74,7 +74,7 @@ public class ClientEndpointTest {
final Injector injector = Tang.Factory.getTang().newInjector();
injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
final PhysicalPlanGenerator physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
- final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+ final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
final LocalMessageDispatcher messageDispatcher = new LocalMessageDispatcher();
final LocalMessageEnvironment messageEnvironment =
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index 68f9b12..ebbc7b6 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -27,8 +27,8 @@ import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
import edu.snu.nemo.compiler.optimizer.policy.PadoPolicy;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.junit.Before;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index 611d821..f948589 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -32,11 +32,9 @@ import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform;
import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.stage.Stage;
-import edu.snu.nemo.runtime.common.plan.stage.StageEdge;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
@@ -80,7 +78,7 @@ public final class DAGConverterTest {
final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
new TestPolicy(), "");
final DAG<Stage, StageEdge> DAGOfStages = physicalPlanGenerator.stagePartitionIrDAG(irDAG);
- final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
+ final DAG<Stage, StageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
// Test DAG of stages
final List<Stage> sortedDAGOfStages = DAGOfStages.getTopologicalSort();
@@ -94,9 +92,9 @@ public final class DAGConverterTest {
assertEquals(DAGOfStages.getOutgoingEdgesOf(stage2).size(), 0);
// Test Physical DAG
- final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
- final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
- final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+ final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+ final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+ final Stage physicalStage2 = sortedPhysicalDAG.get(1);
assertEquals(physicalDAG.getVertices().size(), 2);
assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
@@ -238,10 +236,10 @@ public final class DAGConverterTest {
// Test Physical DAG
-// final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
-// final List<PhysicalStage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
-// final PhysicalStage physicalStage1 = sortedPhysicalDAG.get(0);
-// final PhysicalStage physicalStage2 = sortedPhysicalDAG.get(1);
+// final DAG<Stage, StageEdge> physicalDAG = logicalDAG.convert(new PhysicalDAGGenerator());
+// final List<Stage> sortedPhysicalDAG = physicalDAG.getTopologicalSort();
+// final Stage physicalStage1 = sortedPhysicalDAG.get(0);
+// final Stage physicalStage2 = sortedPhysicalDAG.get(1);
// assertEquals(physicalDAG.getVertices().size(), 2);
// assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage1).size(), 0);
// assertEquals(physicalDAG.getIncomingEdgesOf(physicalStage2).size(), 1);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a6df1d4..a946b4b 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,8 +27,9 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.*;
import edu.snu.nemo.runtime.executor.MetricMessageSender;
import edu.snu.nemo.runtime.executor.TaskExecutor;
import edu.snu.nemo.runtime.executor.TaskStateManager;
@@ -59,7 +60,7 @@ import static org.mockito.Mockito.*;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
- TaskStateManager.class, PhysicalStageEdge.class})
+ TaskStateManager.class, StageEdge.class})
public final class TaskExecutorTest {
private static final int DATA_SIZE = 100;
private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
@@ -112,7 +113,7 @@ public final class TaskExecutorTest {
final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().addVertex(sourceIRVertex).buildWithoutSourceSinkCheck();
- final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+ final StageEdge stageOutEdge = mock(StageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
final ExecutableTask executableTask =
@@ -165,9 +166,9 @@ public final class TaskExecutorTest {
runtimeIREdgeId, edgeProperties, operatorIRVertex1, operatorIRVertex2, coder))
.buildWithoutSourceSinkCheck();
final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
- final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
+ final StageEdge stageInEdge = mock(StageEdge.class);
when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
- final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
+ final StageEdge stageOutEdge = mock(StageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
final ExecutableTask executableTask =
new ExecutableTask(
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
index cfdeb2f..98b8cc0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockStoreTest.java
@@ -124,7 +124,7 @@ public final class BlockStoreTest {
IntStream.range(0, NUM_READ_VERTICES).forEach(number -> readTaskIdList.add("Read_IR_vertex"));
// Generates the ids and the data of the blocks to be used.
- final String shuffleEdge = RuntimeIdGenerator.generateRuntimeEdgeId("shuffle_edge");
+ final String shuffleEdge = RuntimeIdGenerator.generateStageEdgeId("shuffle_edge");
IntStream.range(0, NUM_WRITE_VERTICES).forEach(writeTaskIdx -> {
// Create a block for each writer task.
final String blockId = RuntimeIdGenerator.generateBlockId(shuffleEdge, writeTaskIdx);
@@ -146,7 +146,7 @@ public final class BlockStoreTest {
// Following part is for the concurrent read test.
final String writeTaskId = "conc_write_IR_vertex";
final List<String> concReadTaskIdList = new ArrayList<>(NUM_CONC_READ_TASKS);
- final String concEdge = RuntimeIdGenerator.generateRuntimeEdgeId("conc_read_edge");
+ final String concEdge = RuntimeIdGenerator.generateStageEdgeId("conc_read_edge");
// Generates the ids and the data to be used.
concBlockId = RuntimeIdGenerator.generateBlockId(concEdge, NUM_WRITE_VERTICES + NUM_READ_VERTICES + 1);
@@ -168,7 +168,7 @@ public final class BlockStoreTest {
// Generates the ids of the tasks to be used.
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(number -> writeHashTaskIdList.add("hash_write_IR_vertex"));
IntStream.range(0, NUM_READ_HASH_TASKS).forEach(number -> readHashTaskIdList.add("hash_read_IR_vertex"));
- final String hashEdge = RuntimeIdGenerator.generateRuntimeEdgeId("hash_edge");
+ final String hashEdge = RuntimeIdGenerator.generateStageEdgeId("hash_edge");
// Generates the ids and the data of the blocks to be used.
IntStream.range(0, NUM_WRITE_HASH_TASKS).forEach(writeTaskIdx -> {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 61ef9b0..697122e 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -37,8 +37,8 @@ import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.executor.Executor;
import edu.snu.nemo.runtime.executor.MetricManagerWorker;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
@@ -320,9 +320,9 @@ public final class DataTransferTest {
final IRVertex srcMockVertex = mock(IRVertex.class);
final IRVertex dstMockVertex = mock(IRVertex.class);
- final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
- final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+ final Stage srcStage = setupStages("srcStage-" + testIndex);
+ final Stage dstStage = setupStages("dstStage-" + testIndex);
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
srcStage, dstStage, CODER, false);
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -407,13 +407,13 @@ public final class DataTransferTest {
final IRVertex srcMockVertex = mock(IRVertex.class);
final IRVertex dstMockVertex = mock(IRVertex.class);
- final PhysicalStage srcStage = setupStages("srcStage-" + testIndex);
- final PhysicalStage dstStage = setupStages("dstStage-" + testIndex);
- dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
+ final Stage srcStage = setupStages("srcStage-" + testIndex);
+ final Stage dstStage = setupStages("dstStage-" + testIndex);
+ dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
srcStage, dstStage, CODER, false);
final IRVertex dstMockVertex2 = mock(IRVertex.class);
- final PhysicalStage dstStage2 = setupStages("dstStage-" + testIndex2);
- dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
+ final Stage dstStage2 = setupStages("dstStage-" + testIndex2);
+ dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
srcStage, dstStage2, CODER, false);
// Initialize states in Master
srcStage.getTaskIds().forEach(srcTaskId -> {
@@ -541,9 +541,9 @@ public final class DataTransferTest {
return Pair.of(srcVertex, dstVertex);
}
- private PhysicalStage setupStages(final String stageId) {
+ private Stage setupStages(final String stageId) {
final DAG<IRVertex, RuntimeEdge<IRVertex>> emptyDag = new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>().build();
- return new PhysicalStage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
+ return new Stage(stageId, emptyDag, PARALLELISM_TEN, 0, "Not_used", Collections.emptyList());
}
}
--
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.