You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/14 09:51:45 UTC
[incubator-nemo] branch master updated: [NEMO-64] Fix map stage
hang under DataSkewPolicy (#99)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new d59373f [NEMO-64] Fix map stage hang under DataSkewPolicy (#99)
d59373f is described below
commit d59373f59a2b82a170fe581d944364a63e015365
Author: Jeongyoon Eo <je...@spl.snu.ac.kr>
AuthorDate: Tue Aug 14 18:51:42 2018 +0900
[NEMO-64] Fix map stage hang under DataSkewPolicy (#99)
JIRA: [NEMO-64: Fix map stage hang under DataSkewPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64),
[NEMO-181: Fix DataSkewPolicy bug for multiple shuffles](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181)
**Major changes:**
- Data for dynamic optimization is aggregated at `DynOptDataHandler` in Scheduler instead of a variable in `MetricCollectionBarrierVertex`. Updating `MetricCollectionBarrierVertex` as data arrives in RuntimeMaster clashed with serializing IRDAG for scheduled Tasks, which caused `ConcurrentModificationException` and silently killed the Scheduler
- Identifies the target edge to optimize via MetricCollectionProperty in case multiple shuffles are involved.
**Minor changes to note:**
- Removed now unused entries in `dataSizeMetricMsg`
- Added `DataSkewMetricProperty` and `MetricFactory` to make task hash ranges of shuffle edges as an execution property, so that RuntimePass can optimize the given IR DAG
**Tests for the changes:**
- N/A(No new features, covered by `PerKeyMedianITCase`)
**Other comments:**
- Data for dynamic optimization will be aggregated in designated vertex and not in the RuntimeMaster via upcoming PR for [NEMO-99](https://issues.apache.org/jira/browse/NEMO-99)
resolves [NEMO-64](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64), [NEMO-181](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181)
---
.../main/java/edu/snu/nemo/client/JobLauncher.java | 1 +
.../edu/snu/nemo/common/DataSkewMetricFactory.java | 36 ++++++++
.../main/java/edu/snu/nemo/common}/HashRange.java | 21 +++--
.../main/java/edu/snu/nemo/common}/KeyRange.java | 2 +-
.../java/edu/snu/nemo/common/MetricFactory.java | 26 ++++++
.../executionproperty/DataSkewMetricProperty.java | 43 +++++++++
.../edu/snu/nemo/common/ir/vertex/IRVertex.java | 9 ++
.../nemo/compiler/backend/nemo/NemoBackend.java | 2 +-
.../compiletime/annotating/DefaultMetricPass.java | 58 ++++++++++++
.../annotating/SkewResourceSkewedDataPass.java | 10 +-
.../composite/DefaultCompositePass.java | 1 +
.../compiler/optimizer/policy/DataSkewPolicy.java | 8 --
.../optimizer/policy/PolicyBuilderTest.java | 6 +-
conf/src/main/java/edu/snu/nemo/conf/JobConf.java | 1 -
.../nemo/runtime/common/RuntimeIdGenerator.java | 3 +-
.../eventhandler/DynamicOptimizationEvent.java | 31 ++++---
.../DynamicOptimizationEventHandler.java | 9 +-
.../runtime/common/optimizer/RunTimeOptimizer.java | 40 +++++---
.../pass/runtime/DataSkewRuntimePass.java | 101 ++++++++++-----------
.../common/optimizer/pass/runtime/RuntimePass.java | 7 +-
.../snu/nemo/runtime/common/plan/PhysicalPlan.java | 11 +++
.../runtime/common/plan/PhysicalPlanGenerator.java | 13 ++-
.../snu/nemo/runtime/common/plan/StageEdge.java | 31 -------
.../nemo/runtime/common/plan/StagePartitioner.java | 4 +-
.../edu/snu/nemo/runtime/common/plan/Task.java | 4 +-
runtime/common/src/main/proto/ControlMessage.proto | 2 -
.../pass/runtime/DataSkewRuntimePassTest.java | 4 +-
.../runtime/executor/data/BlockManagerWorker.java | 4 +-
.../nemo/runtime/executor/data/block/Block.java | 2 +-
.../runtime/executor/data/block/FileBlock.java | 2 +-
.../data/block/NonSerializedMemoryBlock.java | 2 +-
.../executor/data/block/SerializedMemoryBlock.java | 2 +-
.../runtime/executor/datatransfer/InputReader.java | 14 +--
.../nemo/runtime/executor/data/BlockStoreTest.java | 4 +-
.../snu/nemo/runtime/executor/data/BlockTest.java | 2 +-
.../executor/datatransfer/DataTransferTest.java | 21 +++++
.../runtime/master/DataSkewDynOptDataHandler.java | 61 +++++++++++++
.../snu/nemo/runtime/master/DynOptDataHandler.java | 33 +++++++
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 49 +---------
.../master/resource/ExecutorRepresenter.java | 2 -
.../runtime/master/scheduler/BatchScheduler.java | 79 +++++++++-------
.../scheduler/NodeShareSchedulingConstraint.java | 2 +-
.../scheduler/SchedulingConstraintRegistry.java | 2 +-
.../SkewnessAwareSchedulingConstraint.java | 19 ++--
.../runtime/master/scheduler/TaskDispatcher.java | 1 -
.../SkewnessAwareSchedulingConstraintTest.java | 59 +++++++-----
.../runtime/common/plan/TestPlanGenerator.java | 2 +-
47 files changed, 558 insertions(+), 288 deletions(-)
diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index f0ae953..5950854 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -315,6 +315,7 @@ public final class JobLauncher {
cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class);
cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
+ cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
cl.processCommandLine(args);
return confBuilder.build();
}
diff --git a/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
new file mode 100644
index 0000000..2847d8d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.util.Map;
+
+/**
+ * A {@link MetricFactory} which is used for data skew handling.
+ */
+public final class DataSkewMetricFactory implements MetricFactory<Map<Integer, KeyRange>> {
+ private Map<Integer, KeyRange> metric;
+
+ /**
+ * Default constructor.
+ */
+ public DataSkewMetricFactory(final Map<Integer, KeyRange> metric) {
+ this.metric = metric;
+ }
+
+ public Map<Integer, KeyRange> getMetric() {
+ return metric;
+ }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java b/common/src/main/java/edu/snu/nemo/common/HashRange.java
similarity index 90%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
rename to common/src/main/java/edu/snu/nemo/common/HashRange.java
index 50e4334..cdc7d49 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/HashRange.java
@@ -13,7 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
+
+import java.util.Arrays;
/**
* Descriptor for hash range.
@@ -109,22 +111,23 @@ public final class HashRange implements KeyRange<Integer> {
return false;
}
final HashRange hashRange = (HashRange) o;
- if (rangeBeginInclusive != hashRange.rangeBeginInclusive) {
+ if (rangeBeginInclusive != hashRange.rangeBeginInclusive
+ || rangeEndExclusive != hashRange.rangeEndExclusive
+ || isSkewed != hashRange.isSkewed) {
return false;
}
- return rangeEndExclusive == hashRange.rangeEndExclusive;
+ return true;
}
@Override
public int hashCode() {
- int result = rangeBeginInclusive;
- result = 31 * result + rangeEndExclusive;
- return result;
+ return Arrays.hashCode(new Object[] {
+ rangeBeginInclusive,
+ rangeEndExclusive,
+ isSkewed,
+ });
}
- public void setAsSkewed() {
- isSkewed = true;
- }
public boolean isSkewed() {
return isSkewed;
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
similarity index 97%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
rename to common/src/main/java/edu/snu/nemo/common/KeyRange.java
index 0c46fc5..6fcac36 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
import java.io.Serializable;
diff --git a/common/src/main/java/edu/snu/nemo/common/MetricFactory.java b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
new file mode 100644
index 0000000..a98b81b
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.io.Serializable;
+
+/**
+ * A serializable metric factory.
+ *
+ * @param <T> metric type.
+ */
+public interface MetricFactory<T> extends Serializable {
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
new file mode 100644
index 0000000..a5f37a9
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * DataSkewMetric ExecutionProperty.
+ */
+public final class DataSkewMetricProperty extends EdgeExecutionProperty<DataSkewMetricFactory> {
+ /**
+ * Constructor.
+ *
+ * @param value value of the execution property.
+ */
+ private DataSkewMetricProperty(final DataSkewMetricFactory value) {
+ super(value);
+ }
+
+ /**
+ * Static method exposing the constructor.
+ *
+ * @param value value of the new execution property.
+ * @return the newly created execution property.
+ */
+ public static DataSkewMetricProperty of(final DataSkewMetricFactory value) {
+ return new DataSkewMetricProperty(value);
+ }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index 67fc42d..09ebfbf 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -29,6 +29,7 @@ import java.util.Optional;
*/
public abstract class IRVertex extends Vertex {
private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
+ private boolean stagePartitioned;
/**
* Constructor of IRVertex.
@@ -36,6 +37,7 @@ public abstract class IRVertex extends Vertex {
public IRVertex() {
super(IdManager.newVertexId());
this.executionProperties = ExecutionPropertyMap.of(this);
+ this.stagePartitioned = false;
}
/**
@@ -89,6 +91,13 @@ public abstract class IRVertex extends Vertex {
return executionProperties;
}
+ public final void setStagePartitioned() {
+ stagePartitioned = true;
+ }
+ public final boolean getStagePartitioned() {
+ return stagePartitioned;
+ }
+
/**
* @return IRVertex properties in String form.
*/
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 3e933a2..da7adb6 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
final PhysicalPlanGenerator physicalPlanGenerator) {
final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
- return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
+ return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), irDAG, stageDAG);
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
new file mode 100644
index 0000000..c96ac0b
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Pass for initiating IREdge Metric ExecutionProperty with default key range.
+ */
+public final class DefaultMetricPass extends AnnotatingPass {
+ /**
+ * Default constructor.
+ */
+ public DefaultMetricPass() {
+ super(DataSkewMetricProperty.class);
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ dag.topologicalDo(dst ->
+ dag.getIncomingEdgesOf(dst).forEach(edge -> {
+ if (CommunicationPatternProperty.Value.Shuffle
+ .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+ final int parallelism = dst.getPropertyValue(ParallelismProperty.class).get();
+ final Map<Integer, KeyRange> metric = new HashMap<>();
+ for (int i = 0; i < parallelism; i++) {
+ metric.put(i, HashRange.of(i, i + 1, false));
+ }
+ edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+ }
+ }));
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index e2ac70f..c5bc1d9 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -53,10 +53,16 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
.filter(v -> v instanceof MetricCollectionBarrierVertex)
.forEach(v -> v.setProperty(DynamicOptimizationProperty
.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+
dag.getVertices().stream()
.filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
- && !v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
- .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
+ && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
+ .forEach(childV -> {
+ childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+ dag.getDescendants(childV.getId()).forEach(descendentV -> {
+ descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+ });
+ });
return dag;
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
index 4dc16d4..6bb119b 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
@@ -31,6 +31,7 @@ public final class DefaultCompositePass extends CompositePass {
public DefaultCompositePass() {
super(Arrays.asList(
new DefaultParallelismPass(),
+ new DefaultMetricPass(),
new DefaultEdgeEncoderPass(),
new DefaultEdgeDecoderPass(),
new DefaultDataStorePass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index 9d7eb29..acb8d50 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -44,14 +44,6 @@ public final class DataSkewPolicy implements Policy {
this.policy = BUILDER.build();
}
- public DataSkewPolicy(final int skewness) {
- this.policy = new PolicyBuilder()
- .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
- .registerCompileTimePass(new LoopOptimizationCompositePass())
- .registerCompileTimePass(new DefaultCompositePass())
- .build();
- }
-
@Override
public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
throws Exception {
diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
index f75cb8c..79191ba 100644
--- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -26,19 +26,19 @@ import static org.junit.Assert.assertTrue;
public final class PolicyBuilderTest {
@Test
public void testDisaggregationPolicy() {
- assertEquals(17, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
+ assertEquals(18, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
assertEquals(0, DisaggregationPolicy.BUILDER.getRuntimePasses().size());
}
@Test
public void testTransientResourcePolicy() {
- assertEquals(19, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
+ assertEquals(20, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
assertEquals(0, TransientResourcePolicy.BUILDER.getRuntimePasses().size());
}
@Test
public void testDataSkewPolicy() {
- assertEquals(21, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
+ assertEquals(22, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
assertEquals(1, DataSkewPolicy.BUILDER.getRuntimePasses().size());
}
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 871bf44..d26adaf 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -126,7 +126,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
}
/**
-<<<<<<< HEAD
* Max number of attempts for task scheduling.
*/
@NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1")
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index 2e2e616..049ccbe 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -43,9 +43,10 @@ public final class RuntimeIdGenerator {
* Generates the ID for physical plan.
*
* @return the generated ID
+ * TODO #100: Refactor string-based RuntimeIdGenerator for IR-based DynOpt
*/
public static String generatePhysicalPlanId() {
- return "Plan-" + physicalPlanIdGenerator.getAndIncrement();
+ return "Plan-" + physicalPlanIdGenerator.get();
}
/**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 50df799..9a8b26a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,7 +19,7 @@
package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
/**
@@ -27,25 +27,27 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
*/
public final class DynamicOptimizationEvent implements RuntimeEvent {
private final PhysicalPlan physicalPlan;
- private final MetricCollectionBarrierVertex metricCollectionBarrierVertex;
+ private final Object dynOptData;
private final String taskId;
private final String executorId;
+ private final IREdge targetEdge;
/**
* Default constructor.
* @param physicalPlan physical plan to be optimized.
- * @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from.
* @param taskId id of the task which triggered the dynamic optimization.
* @param executorId the id of executor which executes {@code taskId}
*/
public DynamicOptimizationEvent(final PhysicalPlan physicalPlan,
- final MetricCollectionBarrierVertex metricCollectionBarrierVertex,
+ final Object dynOptData,
final String taskId,
- final String executorId) {
+ final String executorId,
+ final IREdge targetEdge) {
this.physicalPlan = physicalPlan;
- this.metricCollectionBarrierVertex = metricCollectionBarrierVertex;
this.taskId = taskId;
+ this.dynOptData = dynOptData;
this.executorId = executorId;
+ this.targetEdge = targetEdge;
}
/**
@@ -56,14 +58,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
}
/**
- * @return the metric collection barrier vertex for the dynamic optimization.
- */
- public MetricCollectionBarrierVertex getMetricCollectionBarrierVertex() {
- return this.metricCollectionBarrierVertex;
- }
-
- /**
- * @return id of the task which triggered the dynamic optimization
+ * @return id of the task which triggered the dynamic optimization.
*/
public String getTaskId() {
return taskId;
@@ -75,4 +70,12 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
public String getExecutorId() {
return executorId;
}
+
+ public Object getDynOptData() {
+ return this.dynOptData;
+ }
+
+ public IREdge getTargetEdge() {
+ return this.targetEdge;
+ }
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 56ce7c1..a642248 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,7 +20,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.reef.wake.impl.PubSubEventHandler;
@@ -50,11 +50,10 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle
@Override
public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
final PhysicalPlan physicalPlan = dynamicOptimizationEvent.getPhysicalPlan();
- final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
- dynamicOptimizationEvent.getMetricCollectionBarrierVertex();
+ final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
+ final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
- final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan,
- metricCollectionBarrierVertex);
+ final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge);
pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(),
dynamicOptimizationEvent.getExecutorId()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index e251c98..30ad958 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,10 +16,17 @@
package edu.snu.nemo.runtime.common.optimizer;
import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
import java.util.*;
@@ -36,24 +43,27 @@ public final class RunTimeOptimizer {
/**
* Dynamic optimization method to process the dag with an appropriate pass, decided by the stats.
* @param originalPlan original physical execution plan.
- * @param metricCollectionBarrierVertex the vertex that collects metrics and chooses which optimization to perform.
* @return the newly updated optimized physical plan.
*/
public static synchronized PhysicalPlan dynamicOptimization(
final PhysicalPlan originalPlan,
- final MetricCollectionBarrierVertex metricCollectionBarrierVertex) {
- final DynamicOptimizationProperty.Value dynamicOptimizationType =
- metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get();
+ final Object dynOptData,
+ final IREdge targetEdge) {
+ try {
+ final PhysicalPlanGenerator physicalPlanGenerator =
+ Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
- switch (dynamicOptimizationType) {
- case DataSkewRuntimePass:
- // Metric data for DataSkewRuntimePass is a pair of blockIds and map of hashrange, partition size.
- final Pair<List<String>, Map<Integer, Long>> metricData =
- Pair.of(metricCollectionBarrierVertex.getBlockIds(),
- (Map<Integer, Long>) metricCollectionBarrierVertex.getMetricData());
- return new DataSkewRuntimePass().apply(originalPlan, metricData);
- default:
- throw new UnsupportedOperationException("Unknown runtime pass: " + dynamicOptimizationType);
+ // Data for dynamic optimization used in DataSkewRuntimePass
+ // is a map of <hash value, partition size>.
+ final DAG<IRVertex, IREdge> newIrDAG =
+ new DataSkewRuntimePass()
+ .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, (Map<Integer, Long>) dynOptData));
+ final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(newIrDAG);
+ final PhysicalPlan physicalPlan =
+ new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), newIrDAG, stageDAG);
+ return physicalPlan;
+ } catch (final InjectionException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 3218317..67afbad 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,18 +16,18 @@
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.exception.DynamicOptimizationException;
-
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.data.HashRange;
+
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.HashRange;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,19 +41,24 @@ import java.util.stream.Collectors;
* this RuntimePass identifies a number of keys with big partition sizes(skewed key)
* and evenly redistributes data via overwriting incoming edges of destination tasks.
*/
-public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge, Map<Integer, Long>>> {
private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
// Skewed keys denote for top n keys in terms of partition size.
public static final int DEFAULT_NUM_SKEWED_KEYS = 3;
- private int numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+ private int numSkewedKeys;
/**
* Constructor.
*/
public DataSkewRuntimePass() {
- this.eventHandlers = Collections.singleton(
- DynamicOptimizationEventHandler.class);
+ this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
+ this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+ }
+
+ public DataSkewRuntimePass(final int numOfSkewedKeys) {
+ this();
+ this.numSkewedKeys = numOfSkewedKeys;
}
public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
@@ -67,44 +72,32 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
}
@Override
- public PhysicalPlan apply(final PhysicalPlan originalPlan,
- final Pair<List<String>, Map<Integer, Long>> metricData) {
- // Builder to create new stages.
- final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
- new DAGBuilder<>(originalPlan.getStageDAG());
- final List<String> blockIds = metricData.left();
-
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
+ final Pair<IREdge, Map<Integer, Long>> metricData) {
// get edges to optimize
- final List<String> optimizationEdgeIds = blockIds.stream().map(blockId ->
- RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
- final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
- final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
- .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
- .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
+ final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
+ .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
+ .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+ .equals(e.getPropertyValue(MetricCollectionProperty.class)))
.collect(Collectors.toList());
+ final IREdge targetEdge = metricData.left();
// Get number of evaluators of the next stage (number of blocks).
- final Integer numOfDstTasks = optimizationEdges.stream().findFirst().orElseThrow(() ->
- new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size();
+ final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
// Calculate keyRanges.
- final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), numOfDstTasks);
-
+ final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
+ final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
+ for (int i = 0; i < dstParallelism; i++) {
+ taskIdxToKeyRange.put(i, keyRanges.get(i));
+ }
// Overwrite the previously assigned key range in the physical DAG with the new range.
- optimizationEdges.forEach(optimizationEdge -> {
- // Update the information.
- final Map<Integer, KeyRange> taskIdxToHashRange = new HashMap<>();
- for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) {
- taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx));
- }
- optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
- });
-
- return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
+ targetEdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+ return irDAG;
}
public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
- // Identify skewed keyes.
+ // Identify skewed keys.
List<Map.Entry<Integer, Long>> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream()
.sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
.collect(Collectors.toList());
@@ -134,31 +127,31 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
* to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
*
* @param keyToPartitionSizeMap a map of key to partition size.
- * @param numOfDstTasks the number of tasks that receives this data as input.
+ * @param dstParallelism the number of tasks that receive this data as input.
* @return the list of key ranges calculated.
*/
@VisibleForTesting
public List<KeyRange> calculateKeyRanges(final Map<Integer, Long> keyToPartitionSizeMap,
- final Integer numOfDstTasks) {
- // Get the biggest key.
- final int maxKey = keyToPartitionSizeMap.keySet().stream()
+ final Integer dstParallelism) {
+ // Get the last key.
+ final int lastKey = keyToPartitionSizeMap.keySet().stream()
.max(Integer::compareTo)
- .orElseThrow(() -> new DynamicOptimizationException("Cannot find max key among blocks."));
+ .get();
// Identify skewed keys, which is top numSkewedKeys number of keys.
List<Integer> skewedKeys = identifySkewedKeys(keyToPartitionSizeMap);
// Calculate the ideal size for each destination task.
final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size
- final Long idealSizePerTask = totalSize / numOfDstTasks; // and derive the ideal size per task
+ final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task
- final List<KeyRange> keyRanges = new ArrayList<>(numOfDstTasks);
+ final List<KeyRange> keyRanges = new ArrayList<>(dstParallelism);
int startingKey = 0;
int finishingKey = 1;
Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L);
Long prevAccumulatedSize = 0L;
- for (int i = 1; i <= numOfDstTasks; i++) {
- if (i != numOfDstTasks) {
+ for (int i = 1; i <= dstParallelism; i++) {
+ if (i != dstParallelism) {
// Ideal accumulated partition size for this task.
final Long idealAccumulatedSize = idealSizePerTask * i;
// By adding partition sizes, find the accumulated size nearest to the given ideal size.
@@ -185,15 +178,15 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
prevAccumulatedSize = currentAccumulatedSize;
startingKey = finishingKey;
} else { // last one: we put the range of the rest.
- boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+ boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, lastKey + 1);
keyRanges.add(i - 1,
- HashRange.of(startingKey, maxKey + 1, isSkewedKey));
+ HashRange.of(startingKey, lastKey + 1, isSkewedKey));
- while (finishingKey <= maxKey) {
+ while (finishingKey <= lastKey) {
currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
finishingKey++;
}
- LOG.debug("KeyRange {}~{}, Size {}", startingKey, maxKey + 1,
+ LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
currentAccumulatedSize - prevAccumulatedSize);
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 249a239..5a8471f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,9 +15,11 @@
*/
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
+import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.pass.Pass;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import java.util.Set;
import java.util.function.BiFunction;
@@ -28,7 +30,8 @@ import java.util.function.BiFunction;
* after dynamic optimization.
* @param <T> type of the metric data used for dynamic optimization.
*/
-public abstract class RuntimePass<T> extends Pass implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
+public abstract class RuntimePass<T> extends Pass
+ implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>> {
/**
* @return the set of event handlers used with the runtime pass.
*/
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 30f7111..4286bca 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.runtime.common.plan;
import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import java.io.Serializable;
@@ -27,6 +28,7 @@ import java.util.Map;
*/
public final class PhysicalPlan implements Serializable {
private final String id;
+ private final DAG<IRVertex, IREdge> irDAG;
private final DAG<Stage, StageEdge> stageDAG;
private final Map<String, IRVertex> idToIRVertex;
@@ -37,8 +39,10 @@ public final class PhysicalPlan implements Serializable {
* @param stageDAG the DAG of stages.
*/
public PhysicalPlan(final String id,
+ final DAG<IRVertex, IREdge> irDAG,
final DAG<Stage, StageEdge> stageDAG) {
this.id = id;
+ this.irDAG = irDAG;
this.stageDAG = stageDAG;
idToIRVertex = new HashMap<>();
@@ -70,6 +74,13 @@ public final class PhysicalPlan implements Serializable {
return idToIRVertex;
}
+ /**
+ * @return IR DAG.
+ */
+ public DAG<IRVertex, IREdge> getIrDAG() {
+ return irDAG;
+ }
+
@Override
public String toString() {
return stageDAG.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index ca4124b..16f4bc4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -34,6 +34,8 @@ import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.util.*;
@@ -45,6 +47,7 @@ import java.util.function.Function;
public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
private final String dagDirectory;
private final StagePartitioner stagePartitioner;
+ private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName());
/**
* Private constructor.
@@ -147,7 +150,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
- // Prepare vertexIdtoReadables
+ // Prepare vertexIdToReadables
final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
for (int i = 0; i < stageParallelism; i++) {
vertexIdToReadables.add(new HashMap<>());
@@ -156,7 +159,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
// For each IRVertex,
for (final IRVertex irVertex : stageVertices) {
// Take care of the readables of a source vertex.
- if (irVertex instanceof SourceVertex) {
+ if (irVertex instanceof SourceVertex && !irVertex.getStagePartitioned()) {
final SourceVertex sourceVertex = (SourceVertex) irVertex;
try {
final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
@@ -200,6 +203,12 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
dagOfStagesBuilder.addVertex(stage);
stageIdToStageMap.put(stageId, stage);
}
+
+ // To prevent re-fetching readables in source vertex
+ // during re-generation of physical plan for dynamic optimization.
+ for (IRVertex irVertex : stageVertices) {
+ irVertex.setStagePartitioned();
+ }
}
// Add StageEdges
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 98773ea..5218b8f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -21,11 +21,6 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.data.HashRange;
-
-import java.util.HashMap;
-import java.util.Map;
/**
* Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
@@ -45,11 +40,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
private final IRVertex dstVertex;
/**
- * The list between the task idx and key range to read.
- */
- private Map<Integer, KeyRange> taskIdxToKeyRange;
-
- /**
* Value for {@link CommunicationPatternProperty}.
*/
private final CommunicationPatternProperty.Value dataCommunicationPatternValue;
@@ -81,11 +71,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
- // Initialize the key range of each dst task.
- this.taskIdxToKeyRange = new HashMap<>();
- for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
- taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false));
- }
this.dataCommunicationPatternValue = edgeProperties.get(CommunicationPatternProperty.class)
.orElseThrow(() -> new RuntimeException(String.format(
"CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -125,22 +110,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
}
/**
- * @return the list between the task idx and key range to read.
- */
- public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
- return taskIdxToKeyRange;
- }
-
- /**
- * Sets the task idx to key range list.
- *
- * @param taskIdxToKeyRange the list to set.
- */
- public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> taskIdxToKeyRange) {
- this.taskIdxToKeyRange = taskIdxToKeyRange;
- }
-
- /**
* @return {@link CommunicationPatternProperty} value.
*/
public CommunicationPatternProperty.Value getDataCommunicationPattern() {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index abe0a8f..88ef432 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -83,7 +83,7 @@ public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, M
continue;
}
// Assign stageId
- if (testMergability(edge, irDAG)) {
+ if (testMergeability(edge, irDAG)) {
vertexToStageIdMap.put(connectedIRVertex, stageId);
} else {
vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
@@ -99,7 +99,7 @@ public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, M
* @param dag IR DAG which contains {@code edge}
* @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
*/
- private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+ private boolean testMergeability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
// If the destination vertex has multiple inEdges, return false
if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
return false;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 0cfe863..3e95830 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -20,9 +20,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
/**
* A Task is a self-contained executable that can be executed on a machine.
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 59a3212..911990e 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -117,8 +117,6 @@ message BlockStateChangedMsg {
message DataSizeMetricMsg {
// TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
repeated PartitionSizeEntry partitionSize = 1;
- required string blockId = 2;
- required string srcIRVertexId = 3;
}
message PartitionSizeEntry {
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 319b8ba..04fca2b 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -15,8 +15,8 @@
*/
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
import org.junit.Before;
import org.junit.Test;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ba0a71c..4cf0b12 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -296,8 +296,6 @@ public final class BlockManagerWorker {
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.DataSizeMetric)
.setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
- .setBlockId(blockId)
- .setSrcIRVertexId(srcIRVertexId)
.addAllPartitionSize(partitionSizeEntries)
)
.build());
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index e0de210..960f2c5 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 6eef824..be42f6b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.executor.data.block;
import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.executor.data.*;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
import edu.snu.nemo.runtime.executor.data.partition.Partition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 5bf1e01..7722f2d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 847558f..03a7ab1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.executor.data.DataUtil;
import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 4471ae4..9870259 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,6 +16,8 @@
package edu.snu.nemo.runtime.executor.datatransfer;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -23,16 +25,14 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyV
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.common.exception.BlockFetchException;
import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
-import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.common.HashRange;
import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
import edu.snu.nemo.runtime.executor.data.DataUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -45,7 +45,6 @@ import java.util.stream.StreamSupport;
* Represents the input data transfer to a task.
*/
public final class InputReader extends DataTransfer {
- private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName());
private final int dstTaskIndex;
private final BlockManagerWorker blockManagerWorker;
@@ -118,8 +117,9 @@ public final class InputReader extends DataTransfer {
assert (runtimeEdge instanceof StageEdge);
final Optional<DataStoreProperty.Value> dataStoreProperty
= runtimeEdge.getPropertyValue(DataStoreProperty.class);
- final KeyRange hashRangeToRead =
- ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+ final DataSkewMetricFactory metricFactory =
+ (DataSkewMetricFactory) runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
+ final KeyRange hashRangeToRead = metricFactory.getMetric().get(dstTaskIndex);
if (hashRangeToRead == null) {
throw new BlockFetchException(
new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 8f1b7e5..61adee2 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -20,8 +20,8 @@ import edu.snu.nemo.common.coder.*;
import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
index c05745d..46c6ce9 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -15,9 +15,9 @@
*/
package edu.snu.nemo.runtime.executor.data;
+import edu.snu.nemo.common.HashRange;
import edu.snu.nemo.common.coder.IntDecoderFactory;
import edu.snu.nemo.common.coder.IntEncoderFactory;
-import edu.snu.nemo.runtime.common.data.HashRange;
import edu.snu.nemo.runtime.executor.data.block.Block;
import edu.snu.nemo.runtime.executor.data.block.FileBlock;
import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c755543..229f30d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,6 +15,9 @@
*/
package edu.snu.nemo.runtime.executor.datatransfer;
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.common.coder.*;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.edge.IREdge;
@@ -300,6 +303,15 @@ public final class DataTransferTest {
dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
+ if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.Shuffle)) {
+ final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+ final Map<Integer, KeyRange> metric = new HashMap<>();
+ for (int i = 0; i < parallelism; i++) {
+ metric.put(i, HashRange.of(i, i + 1, false));
+ }
+ dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+ }
final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
final RuntimeEdge dummyEdge;
@@ -383,6 +395,15 @@ public final class DataTransferTest {
= dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
duplicateDataProperty.get().setGroupSize(2);
+ if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+ .equals(CommunicationPatternProperty.Value.Shuffle)) {
+ final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+ final Map<Integer, KeyRange> metric = new HashMap<>();
+ for (int i = 0; i < parallelism; i++) {
+ metric.put(i, HashRange.of(i, i + 1, false));
+ }
+ dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+ }
dummyIREdge.setProperty(DataStoreProperty.of(store));
dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
final RuntimeEdge dummyEdge, dummyEdge2;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
new file mode 100644
index 0000000..47671ed
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handler for aggregating data used in data skew dynamic optimization.
+ */
+public class DataSkewDynOptDataHandler implements DynOptDataHandler {
+ private final Map<Integer, Long> aggregatedDynOptData;
+
+ public DataSkewDynOptDataHandler() {
+ this.aggregatedDynOptData = new HashMap<>();
+ }
+
+ /**
+ * Updates data for dynamic optimization sent from Tasks.
+ * @param dynOptData data used for data skew dynamic optimization.
+ */
+ @Override
+ public final void updateDynOptData(final Object dynOptData) {
+ List<ControlMessage.PartitionSizeEntry> partitionSizeInfo
+ = (List<ControlMessage.PartitionSizeEntry>) dynOptData;
+ partitionSizeInfo.forEach(partitionSizeEntry -> {
+ final int hashIndex = partitionSizeEntry.getKey();
+ final long partitionSize = partitionSizeEntry.getSize();
+ if (aggregatedDynOptData.containsKey(hashIndex)) {
+ aggregatedDynOptData.compute(hashIndex, (originalKey, originalValue) -> originalValue + partitionSize);
+ } else {
+ aggregatedDynOptData.put(hashIndex, partitionSize);
+ }
+ });
+ }
+
+ /**
+ * Returns aggregated data for dynamic optimization.
+ * @return aggregated data used for data skew dynamic optimization.
+ */
+ @Override
+ public final Object getDynOptData() {
+ return aggregatedDynOptData;
+ }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
new file mode 100644
index 0000000..66fa310
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+/**
+ * Handler for aggregating data used in dynamic optimization.
+ */
+public interface DynOptDataHandler {
+ /**
+ * Updates data for dynamic optimization sent from Tasks.
+ * @param dynOptData data used for dynamic optimization.
+ */
+ void updateDynOptData(Object dynOptData);
+
+ /**
+ * Returns aggregated data for dynamic optimization.
+ * @return aggregated data used for dynamic optimization.
+ */
+ Object getDynOptData();
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 41ad243..d1fa2df 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -19,13 +19,13 @@ import edu.snu.nemo.common.Pair;
import edu.snu.nemo.conf.JobConf;
import edu.snu.nemo.common.exception.*;
import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageContext;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageListener;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.scheduler.BatchScheduler;
import edu.snu.nemo.runtime.master.servlet.*;
import edu.snu.nemo.runtime.master.resource.ContainerManager;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -82,8 +82,6 @@ public final class RuntimeMaster {
private final MetricMessageHandler metricMessageHandler;
private final MessageEnvironment masterMessageEnvironment;
private final MetricStore metricStore;
- private final Map<Integer, Long> aggregatedMetricData;
- private final ExecutorService metricAggregationService;
private final ClientRPC clientRPC;
private final MetricManagerMaster metricManagerMaster;
// For converting json data. This is a thread safe.
@@ -123,8 +121,6 @@ public final class RuntimeMaster {
this.irVertices = new HashSet<>();
this.resourceRequestCount = new AtomicInteger(0);
this.objectMapper = new ObjectMapper();
- this.aggregatedMetricData = new ConcurrentHashMap<>();
- this.metricAggregationService = Executors.newFixedThreadPool(10);
this.metricStore = MetricStore.getStore();
this.metricServer = startRestMetricServer();
}
@@ -343,10 +339,8 @@ public final class RuntimeMaster {
LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
throw new RuntimeException(exception);
case DataSizeMetric:
- final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
// TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
- accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
- dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId());
+ ((BatchScheduler) scheduler).updateDynOptData(message.getDataSizeMetricMsg().getPartitionSizeList());
break;
case MetricMessageReceived:
final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
@@ -371,45 +365,6 @@ public final class RuntimeMaster {
}
}
- /**
- * Accumulates the metric data for a barrier vertex.
- * TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
- * TODO #98: Implement MetricVertex that collect metric used for dynamic optimization.
- *
- * @param partitionSizeInfo the size of partitions in a block to accumulate.
- * @param srcVertexId the ID of the source vertex.
- * @param blockId the ID of the block.
- */
- private void accumulateBarrierMetric(final List<ControlMessage.PartitionSizeEntry> partitionSizeInfo,
- final String srcVertexId,
- final String blockId) {
- final IRVertex vertexToSendMetricDataTo = irVertices.stream()
- .filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst()
- .orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist in the submitted Physical Plan"));
-
- if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) {
- final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex =
- (MetricCollectionBarrierVertex) vertexToSendMetricDataTo;
-
- metricCollectionBarrierVertex.addBlockId(blockId);
- metricAggregationService.submit(() -> {
- // For each hash range index, we aggregate the metric data.
- partitionSizeInfo.forEach(partitionSizeEntry -> {
- final int key = partitionSizeEntry.getKey();
- final long size = partitionSizeEntry.getSize();
- if (aggregatedMetricData.containsKey(key)) {
- aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size);
- } else {
- aggregatedMetricData.put(key, size);
- }
- });
- metricCollectionBarrierVertex.setMetricData(aggregatedMetricData);
- });
- } else {
- throw new RuntimeException("Something wrong happened at SkewCompositePass.");
- }
- }
-
private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
switch (state) {
case READY:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d0e21d8..d5c2e89 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -44,7 +44,6 @@ import java.util.stream.Stream;
*/
@NotThreadSafe
public final class ExecutorRepresenter {
-
private final String executorId;
private final ResourceSpecification resourceSpecification;
private final Map<String, Task> runningComplyingTasks;
@@ -107,7 +106,6 @@ public final class ExecutorRepresenter {
? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
runningTaskToAttempt.put(task, task.getAttemptIdx());
failedTasks.remove(task);
-
serializationExecutorService.submit(() -> {
final byte[] serialized = SerializationUtils.serialize(task);
sendControlMessage(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 24c0e43..77881c0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,18 +17,20 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.collect.Sets;
import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
import edu.snu.nemo.runtime.common.plan.*;
import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
+import edu.snu.nemo.runtime.master.DynOptDataHandler;
import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
import edu.snu.nemo.common.exception.*;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.PlanStateManager;
@@ -75,6 +77,7 @@ public final class BatchScheduler implements Scheduler {
private PhysicalPlan physicalPlan;
private PlanStateManager planStateManager;
private List<List<Stage>> sortedScheduleGroups;
+ private List<DynOptDataHandler> dynOptDataHandlers;
@Inject
private BatchScheduler(final TaskDispatcher taskDispatcher,
@@ -93,6 +96,8 @@ public final class BatchScheduler implements Scheduler {
.subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
}
this.executorRegistry = executorRegistry;
+ this.dynOptDataHandlers = new ArrayList<>();
+ dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
}
/**
@@ -130,14 +135,12 @@ public final class BatchScheduler implements Scheduler {
* Handles task state transition notifications sent from executors.
* Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
* We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
- *
* @param executorId the id of the executor where the message was sent from.
* @param taskId whose state has changed
* @param taskAttemptIndex of the task whose state has changed
* @param newState the state to change to
* @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
*/
- @Override
public void onTaskStateReportFromExecutor(final String executorId,
final String taskId,
final int taskAttemptIndex,
@@ -158,7 +161,7 @@ public final class BatchScheduler implements Scheduler {
onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
break;
case ON_HOLD:
- onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
+ onTaskExecutionOnHold(executorId, taskId);
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
@@ -364,15 +367,34 @@ public final class BatchScheduler implements Scheduler {
});
}
+ public IREdge getEdgeToOptimize(final String taskId) {
+ // Get a stage including the given task
+ final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream()
+ .filter(stage -> stage.getTaskIds().contains(taskId)).findFirst().get();
+
+ // Get outgoing edges of that stage with MetricCollectionProperty
+ List<StageEdge> stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
+ IREdge targetEdge = null;
+ for (StageEdge edge : stageEdges) {
+ final IRVertex srcIRVertex = edge.getSrcIRVertex();
+ final IRVertex dstIRVertex = edge.getDstIRVertex();
+ targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), dstIRVertex.getId());
+ if (MetricCollectionProperty.Value.DataSkewRuntimePass
+ .equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
+ break;
+ }
+ }
+
+ return targetEdge;
+ }
+
/**
* Action for after task execution is put on hold.
* @param executorId the ID of the executor.
* @param taskId the ID of the task.
- * @param vertexPutOnHold the ID of vertex that is put on hold.
*/
private void onTaskExecutionOnHold(final String executorId,
- final String taskId,
- final String vertexPutOnHold) {
+ final String taskId) {
LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
executorRegistry.updateExecutor(executorId, (executor, state) -> {
executor.onTaskExecutionComplete(taskId);
@@ -383,21 +405,18 @@ public final class BatchScheduler implements Scheduler {
final boolean stageComplete =
planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
+ final IREdge targetEdge = getEdgeToOptimize(taskId);
+ if (targetEdge == null) {
+ throw new RuntimeException("No edges specified for data skew optimization");
+ }
+
if (stageComplete) {
- // get optimization vertex from the task.
- final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
- getVertexDagById(taskId).getVertices().stream() // get vertex list
- .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
- .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
- .distinct()
- .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
- .findFirst().orElseThrow(() -> new RuntimeException(TaskState.State.ON_HOLD.name() // get it
- + " called with failed task ids by some other task than "
- + MetricCollectionBarrierVertex.class.getSimpleName()));
- // and we will use this vertex to perform metric collection and dynamic optimization.
-
- pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
- new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
+ final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+ .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+ .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+ pubSubEventHandlerWrapper.getPubSubEventHandler()
+ .onNext(new DynamicOptimizationEvent(physicalPlan, dynOptDataHandler.getDynOptData(),
+ taskId, executorId, targetEdge));
}
}
@@ -478,16 +497,10 @@ public final class BatchScheduler implements Scheduler {
.collect(Collectors.toSet());
}
- /**
- * @param taskId id of the task
- * @return the IR dag
- */
- private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
- for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
- if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
- return stage.getIRDAG();
- }
- }
- throw new RuntimeException("This taskId does not exist in the plan");
+ public void updateDynOptData(final Object dynOptData) {
+ final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+ .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+ .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+ dynOptDataHandler.updateDynOptData(dynOptData);
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index ff3986e..a2c0935 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -51,7 +51,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
@Override
public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
final Map<String, Integer> propertyValue = task.getPropertyValue(ResourceSiteProperty.class)
- .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
+ .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
if (propertyValue.isEmpty()) {
return true;
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 4a774bf..50dd8d5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -45,8 +45,8 @@ public final class SchedulingConstraintRegistry {
registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
registerSchedulingConstraint(freeSlotSchedulingConstraint);
registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
- registerSchedulingConstraint(nodeShareSchedulingConstraint);
registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
+ registerSchedulingConstraint(nodeShareSchedulingConstraint);
}
/**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
index 236453f..6e5ba4c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
@@ -16,11 +16,13 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -28,6 +30,7 @@ import org.apache.reef.annotations.audience.DriverSide;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
+import java.util.Map;
/**
* This policy aims to distribute partitions with skewed keys to different executors.
@@ -36,7 +39,6 @@ import javax.inject.Inject;
@DriverSide
@AssociatedProperty(ResourceSkewedDataProperty.class)
public final class SkewnessAwareSchedulingConstraint implements SchedulingConstraint {
-
@VisibleForTesting
@Inject
public SkewnessAwareSchedulingConstraint() {
@@ -45,9 +47,14 @@ public final class SkewnessAwareSchedulingConstraint implements SchedulingConstr
public boolean hasSkewedData(final Task task) {
final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
for (StageEdge inEdge : task.getTaskIncomingEdges()) {
- final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx);
- if (((HashRange) hashRange).isSkewed()) {
- return true;
+ if (CommunicationPatternProperty.Value.Shuffle
+ .equals(inEdge.getDataCommunicationPattern())) {
+ final Map<Integer, KeyRange> taskIdxToKeyRange =
+ inEdge.getPropertyValue(DataSkewMetricProperty.class).get().getMetric();
+ final KeyRange hashRange = taskIdxToKeyRange.get(taskIdx);
+ if (((HashRange) hashRange).isSkewed()) {
+ return true;
+ }
}
}
return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 6c0222c..e6feaa5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -131,7 +131,6 @@ final class TaskDispatcher {
planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
-
// send the task
selectedExecutor.onTaskScheduled(task);
} else {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
index 87c933f..e3439d6 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
@@ -15,9 +15,16 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -36,27 +43,36 @@ import static org.mockito.Mockito.when;
* Test cases for {@link SkewnessAwareSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, StageEdge.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Stage.class, HashRange.class,
+IRVertex.class, IREdge.class})
public final class SkewnessAwareSchedulingConstraintTest {
- private static StageEdge mockStageEdge() {
+ private static StageEdge mockStageEdge(final int numSkewedHashRange,
+ final int numTotalHashRange) {
final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
- final HashRange skewedHashRange1 = mock(HashRange.class);
- when(skewedHashRange1.isSkewed()).thenReturn(true);
- final HashRange skewedHashRange2 = mock(HashRange.class);
- when(skewedHashRange2.isSkewed()).thenReturn(true);
- final HashRange hashRange = mock(HashRange.class);
- when(hashRange.isSkewed()).thenReturn(false);
+ for (int taskIdx = 0; taskIdx < numTotalHashRange; taskIdx++) {
+ final HashRange hashRange = mock(HashRange.class);
+ if (taskIdx < numSkewedHashRange) {
+ when(hashRange.isSkewed()).thenReturn(true);
+ } else {
+ when(hashRange.isSkewed()).thenReturn(false);
+ }
+ taskIdxToKeyRange.put(taskIdx, hashRange);
+ }
- taskIdxToKeyRange.put(0, skewedHashRange1);
- taskIdxToKeyRange.put(1, skewedHashRange2);
- taskIdxToKeyRange.put(2, hashRange);
+ final IRVertex srcMockVertex = mock(IRVertex.class);
+ final IRVertex dstMockVertex = mock(IRVertex.class);
+ final Stage srcMockStage = mock(Stage.class);
+ final Stage dstMockStage = mock(Stage.class);
- final StageEdge inEdge = mock(StageEdge.class);
- when(inEdge.getTaskIdxToKeyRange()).thenReturn(taskIdxToKeyRange);
+ final IREdge dummyIREdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, srcMockVertex, dstMockVertex);
+ dummyIREdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+ final StageEdge dummyEdge = new StageEdge("Edge-0", dummyIREdge.getExecutionProperties(),
+ srcMockVertex, dstMockVertex, srcMockStage, dstMockStage, false);
- return inEdge;
+ return dummyEdge;
}
private static Task mockTask(final int taskIdx, final List<StageEdge> inEdges) {
@@ -81,11 +97,12 @@ public final class SkewnessAwareSchedulingConstraintTest {
@Test
public void testScheduleSkewedTasks() {
final SchedulingConstraint schedulingConstraint = new SkewnessAwareSchedulingConstraint();
- final StageEdge inEdge = mockStageEdge();
- final Task task0 = mockTask(0, Arrays.asList(inEdge));
- final Task task1 = mockTask(1, Arrays.asList(inEdge));
- final Task task2 = mockTask(2, Arrays.asList(inEdge));
- final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);
+ // Create a StageEdge where two out of three are skewed hash ranges.
+ final StageEdge inEdge = mockStageEdge(2, 3);
+ final Task task0 = mockTask(0, Arrays.asList(inEdge)); // skewed task
+ final Task task1 = mockTask(1, Arrays.asList(inEdge)); // skewed task
+ final Task task2 = mockTask(2, Arrays.asList(inEdge)); // non-skewed task
+ final ExecutorRepresenter e0 = mockExecutorRepresenter(task0); // schedule skewed task to e0
assertEquals(true, schedulingConstraint.testSchedulability(e0, task2));
assertEquals(false, schedulingConstraint.testSchedulability(e0, task1));
diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3b3646e..3c23dcf 100644
--- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ public final class TestPlanGenerator {
final Policy policy) throws Exception {
final DAG<IRVertex, IREdge> optimized = policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
- return new PhysicalPlan("TestPlan", physicalDAG);
+ return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
}
/**