You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/10 22:56:11 UTC
[incubator-nemo] branch master updated: [NEMO-59] Skewed data-aware
executor allocation (#72)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 7439962 [NEMO-59] Skewed data-aware executor allocation (#72)
7439962 is described below
commit 7439962c6699a09c630ae3c63a2a35fdf01e913d
Author: Jeongyoon Eo <je...@gmail.com>
AuthorDate: Wed Jul 11 07:56:09 2018 +0900
[NEMO-59] Skewed data-aware executor allocation (#72)
JIRA: [NEMO-59: Skewed data-aware executor allocation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-59)
**Major changes:**
- SkewnessAwareSchedulingProperty to mark vertices subject to skew handling
- SkewnessAwareSchedulingConstraint that distribute partitions with skewed keys to different executors
- HashRange with isSkewed value that marks the given HashRange as skewed
**Minor changes to note:**
- ExecutorRepresenter manages Task instead of Task ids, so that scheduling constraints can use running task-related information from executor
**Tests for the changes:**
- SkewnessAwareSchedulingConstraintTest for testing the scheduling constraint
- Modified DataSkewCompositePassTest for testing SkewnessAwareSchedulingProperty
**Other comments:**
N/A
resolves [NEMO-59](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-59)
---
.../SkewnessAwareSchedulingProperty.java | 47 ++++++++
.../compiletime/annotating/DataSkewVertexPass.java | 29 ++++-
.../compiler/optimizer/policy/DataSkewPolicy.java | 8 ++
.../snu/nemo/runtime/common/data/HashRange.java | 18 ++-
.../runtime/common/optimizer/RuntimeOptimizer.java | 3 +-
.../pass/runtime/DataSkewRuntimePass.java | 134 ++++++++++++++-------
.../snu/nemo/runtime/common/plan/StageEdge.java | 14 +--
.../pass/runtime/DataSkewRuntimePassTest.java | 48 +++++---
.../nemo/runtime/executor/data/BlockStoreTest.java | 9 +-
.../edu/snu/nemo/runtime/master/RuntimeMaster.java | 54 ++++-----
.../master/resource/ExecutorRepresenter.java | 50 ++++----
.../runtime/master/scheduler/ExecutorRegistry.java | 7 +-
.../MinOccupancyFirstSchedulingPolicy.java | 3 -
.../scheduler/SchedulingConstraintRegistry.java | 2 +
.../SkewnessAwareSchedulingConstraint.java | 66 ++++++++++
.../SourceLocationAwareSchedulingConstraint.java | 3 -
.../FreeSlotSchedulingConstraintTest.java | 10 +-
.../MinOccupancyFirstSchedulingPolicyTest.java | 10 +-
.../SkewnessAwareSchedulingConstraintTest.java | 93 ++++++++++++++
...ourceLocationAwareSchedulingConstraintTest.java | 3 +-
.../runtime/plangenerator/TestPlanGenerator.java | 1 -
.../composite/DataSkewCompositePassTest.java | 7 ++
22 files changed, 473 insertions(+), 146 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkewnessAwareSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkewnessAwareSchedulingProperty.java
new file mode 100644
index 0000000..9ec3364
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SkewnessAwareSchedulingProperty.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.executionproperty;
+
+import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
+
+/**
+ * This property decides whether or not to handle skew when scheduling this vertex.
+ */
+public final class SkewnessAwareSchedulingProperty extends VertexExecutionProperty<Boolean> {
+ private static final SkewnessAwareSchedulingProperty HANDLE_SKEW
+ = new SkewnessAwareSchedulingProperty(true);
+ private static final SkewnessAwareSchedulingProperty DONT_HANDLE_SKEW
+ = new SkewnessAwareSchedulingProperty(false);
+
+ /**
+ * Default constructor.
+ *
+ * @param value value of the ExecutionProperty
+ */
+ private SkewnessAwareSchedulingProperty(final boolean value) {
+ super(value);
+ }
+
+ /**
+ * Static method getting execution property.
+ *
+ * @param value value of the new execution property
+ * @return the execution property
+ */
+ public static SkewnessAwareSchedulingProperty of(final boolean value) {
+ return value ? HANDLE_SKEW : DONT_HANDLE_SKEW;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
index c9e7aa0..2f543cb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DataSkewVertexPass.java
@@ -20,6 +20,9 @@ import edu.snu.nemo.common.ir.edge.IREdge;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkewnessAwareSchedulingProperty;
+
+import java.util.List;
/**
* Pass to annotate the DAG for a job to perform data skew.
@@ -33,14 +36,28 @@ public final class DataSkewVertexPass extends AnnotatingPass {
super(DynamicOptimizationProperty.class);
}
+ private boolean hasMetricCollectionBarrierVertexAsParent(final DAG<IRVertex, IREdge> dag,
+ final IRVertex v) {
+ List<IRVertex> parents = dag.getParents(v.getId());
+ for (IRVertex parent : parents) {
+ if (parent instanceof MetricCollectionBarrierVertex) {
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- dag.topologicalDo(v -> {
- // we only care about metric collection barrier vertices.
- if (v instanceof MetricCollectionBarrierVertex) {
- v.setProperty(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass));
- }
- });
+ dag.getVertices().stream()
+ .filter(v -> v instanceof MetricCollectionBarrierVertex)
+ .forEach(v -> v.setProperty(DynamicOptimizationProperty
+ .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+ dag.getVertices().stream()
+ .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
+ && !v.getExecutionProperties().containsKey(SkewnessAwareSchedulingProperty.class))
+ .forEach(v -> v.getExecutionProperties().put(SkewnessAwareSchedulingProperty.of(true)));
+
return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index c2f1f4a..4226bc6 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -41,6 +41,14 @@ public final class DataSkewPolicy implements Policy {
.build();
}
+ public DataSkewPolicy(final int skewness) {
+ this.policy = new PolicyBuilder(true)
+ .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(skewness), new DataSkewCompositePass())
+ .registerCompileTimePass(new LoopOptimizationCompositePass())
+ .registerCompileTimePass(new PrimitiveCompositePass())
+ .build();
+ }
+
@Override
public List<CompileTimePass> getCompileTimePasses() {
return this.policy.getCompileTimePasses();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
index ae1ca3e..50e4334 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
@@ -19,22 +19,23 @@ package edu.snu.nemo.runtime.common.data;
* Descriptor for hash range.
*/
public final class HashRange implements KeyRange<Integer> {
- private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE);
-
+ private static final HashRange ALL = new HashRange(0, Integer.MAX_VALUE, false);
private final int rangeBeginInclusive;
private final int rangeEndExclusive;
+ private boolean isSkewed;
/**
* Private constructor.
* @param rangeBeginInclusive point at which the hash range starts (inclusive).
* @param rangeEndExclusive point at which the hash range ends (exclusive).
*/
- private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive) {
+ private HashRange(final int rangeBeginInclusive, final int rangeEndExclusive, final boolean isSkewed) {
if (rangeBeginInclusive < 0 || rangeEndExclusive < 0) {
throw new RuntimeException("Each boundary value of the range have to be non-negative.");
}
this.rangeBeginInclusive = rangeBeginInclusive;
this.rangeEndExclusive = rangeEndExclusive;
+ this.isSkewed = isSkewed;
}
/**
@@ -49,8 +50,8 @@ public final class HashRange implements KeyRange<Integer> {
* @param rangeEndExclusive the end of the range (exclusive)
* @return A hash range descriptor representing [{@code rangeBeginInclusive}, {@code rangeEndExclusive})
*/
- public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive) {
- return new HashRange(rangeStartInclusive, rangeEndExclusive);
+ public static HashRange of(final int rangeStartInclusive, final int rangeEndExclusive, final boolean isSkewed) {
+ return new HashRange(rangeStartInclusive, rangeEndExclusive, isSkewed);
}
/**
@@ -120,4 +121,11 @@ public final class HashRange implements KeyRange<Integer> {
result = 31 * result + rangeEndExclusive;
return result;
}
+
+ public void setAsSkewed() {
+ isSkewed = true;
+ }
+ public boolean isSkewed() {
+ return isSkewed;
+ }
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
index 5d33304..888e6f5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RuntimeOptimizer.java
@@ -47,8 +47,7 @@ public final class RuntimeOptimizer {
switch (dynamicOptimizationType) {
case DataSkewRuntimePass:
- // Metric data for DataSkewRuntimePass is
- // a pair of blockIds and map of hashrange, partition size.
+ // Metric data for DataSkewRuntimePass is a pair of blockIds and map of hashrange, partition size.
final Pair<List<String>, Map<Integer, Long>> metricData =
Pair.of(metricCollectionBarrierVertex.getBlockIds(),
(Map<Integer, Long>) metricCollectionBarrierVertex.getMetricData());
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index d8999b4..cf8f5b6 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -34,15 +34,19 @@ import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* Dynamic optimization pass for handling data skew.
- * It receives pairs of the key index and the size of a partition for each output block.
+ * Using a map of key to partition size as a metric used for dynamic optimization,
+ * this RuntimePass identifies a number of keys with big partition sizes(skewed key)
+ * and evenly redistributes data via overwriting incoming edges of destination tasks.
*/
public final class DataSkewRuntimePass implements RuntimePass<Pair<List<String>, Map<Integer, Long>>> {
private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
+ // Skewed keys denote for top n keys in terms of partition size.
+ private static final int DEFAULT_NUM_SKEWED_KEYS = 3;
+ private int numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
/**
* Constructor.
@@ -52,6 +56,11 @@ public final class DataSkewRuntimePass implements RuntimePass<Pair<List<String>,
DynamicOptimizationEventHandler.class);
}
+ public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
+ numSkewedKeys = numOfSkewedKeys;
+ return this;
+ }
+
@Override
public Set<Class<? extends RuntimeEventHandler>> getEventHandlerClasses() {
return this.eventHandlers;
@@ -75,74 +84,117 @@ public final class DataSkewRuntimePass implements RuntimePass<Pair<List<String>,
.collect(Collectors.toList());
// Get number of evaluators of the next stage (number of blocks).
- final Integer taskListSize = optimizationEdges.stream().findFirst().orElseThrow(() ->
+ final Integer numOfDstTasks = optimizationEdges.stream().findFirst().orElseThrow(() ->
new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size();
// Calculate keyRanges.
- final List<KeyRange> keyRanges = calculateHashRanges(metricData.right(), taskListSize);
+ final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), numOfDstTasks);
- // Overwrite the previously assigned hash value range in the physical DAG with the new range.
+ // Overwrite the previously assigned key range in the physical DAG with the new range.
optimizationEdges.forEach(optimizationEdge -> {
// Update the information.
- final List<KeyRange> taskIdxToHashRange = new ArrayList<>();
- IntStream.range(0, taskListSize).forEach(i -> taskIdxToHashRange.add(keyRanges.get(i)));
+ final Map<Integer, KeyRange> taskIdxToHashRange = new HashMap<>();
+ for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) {
+ taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx));
+ }
optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
});
return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
}
+ public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
+ // Identify skewed keyes.
+ List<Map.Entry<Integer, Long>> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream()
+ .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
+ .collect(Collectors.toList());
+ List<Integer> skewedKeys = new ArrayList<>();
+ for (int i = 0; i < numSkewedKeys; i++) {
+ skewedKeys.add(sortedMetricData.get(i).getKey());
+ LOG.info("Skewed key: Key {} Size {}", sortedMetricData.get(i).getKey(), sortedMetricData.get(i).getValue());
+ }
+
+ return skewedKeys;
+ }
+
+ private boolean containsSkewedKey(final List<Integer> skewedKeys,
+ final int startingKey, final int finishingKey) {
+ for (int k = startingKey; k < finishingKey; k++) {
+ if (skewedKeys.contains(k)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
- * Method for calculating key ranges to evenly distribute the skewed metric data.
+ * Evenly distribute the skewed data to the destination tasks.
+ * Partition denotes for a keyed portion of a Task output, whose key is a key.
+ * Using a map of key to partition size, this method groups the given partitions
+ * to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
*
- * @param aggregatedMetricData the metric data.
- * @param taskListSize the size of the task list.
+ * @param keyToPartitionSizeMap a map of key to partition size.
+ * @param numOfDstTasks the number of tasks that receives this data as input.
* @return the list of key ranges calculated.
*/
@VisibleForTesting
- public List<KeyRange> calculateHashRanges(final Map<Integer, Long> aggregatedMetricData,
- final Integer taskListSize) {
- // NOTE: aggregatedMetricDataMap is made up of a map of (hash value, blockSize).
- // Get the max hash value.
- final int maxHashValue = aggregatedMetricData.keySet().stream()
+ public List<KeyRange> calculateKeyRanges(final Map<Integer, Long> keyToPartitionSizeMap,
+ final Integer numOfDstTasks) {
+ // Get the biggest key.
+ final int maxKey = keyToPartitionSizeMap.keySet().stream()
.max(Integer::compareTo)
- .orElseThrow(() -> new DynamicOptimizationException("Cannot find max hash value among blocks."));
-
- // Do the optimization using the information derived above.
- final Long totalSize = aggregatedMetricData.values().stream().mapToLong(n -> n).sum(); // get total size
- final Long idealSizePerTask = totalSize / taskListSize; // and derive the ideal size per task
- LOG.info("idealSizePerTask {} = {}(totalSize) / {}(taskListSize)",
- idealSizePerTask, totalSize, taskListSize);
-
- // find HashRanges to apply (for each blocks of each block).
- final List<KeyRange> keyRanges = new ArrayList<>(taskListSize);
- int startingHashValue = 0;
- int finishingHashValue = 1; // initial values
- Long currentAccumulatedSize = aggregatedMetricData.getOrDefault(startingHashValue, 0L);
- for (int i = 1; i <= taskListSize; i++) {
- if (i != taskListSize) {
- final Long idealAccumulatedSize = idealSizePerTask * i; // where we should end
- // find the point while adding up one by one.
+ .orElseThrow(() -> new DynamicOptimizationException("Cannot find max key among blocks."));
+
+ // Identify skewed keys, which is top numSkewedKeys number of keys.
+ List<Integer> skewedKeys = identifySkewedKeys(keyToPartitionSizeMap);
+
+ // Calculate the ideal size for each destination task.
+ final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size
+ final Long idealSizePerTask = totalSize / numOfDstTasks; // and derive the ideal size per task
+
+ final List<KeyRange> keyRanges = new ArrayList<>(numOfDstTasks);
+ int startingKey = 0;
+ int finishingKey = 1;
+ Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L);
+ Long prevAccumulatedSize = 0L;
+ for (int i = 1; i <= numOfDstTasks; i++) {
+ if (i != numOfDstTasks) {
+ // Ideal accumulated partition size for this task.
+ final Long idealAccumulatedSize = idealSizePerTask * i;
+ // By adding partition sizes, find the accumulated size nearest to the given ideal size.
while (currentAccumulatedSize < idealAccumulatedSize) {
- currentAccumulatedSize += aggregatedMetricData.getOrDefault(finishingHashValue, 0L);
- finishingHashValue++;
+ currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
+ finishingKey++;
}
final Long oneStepBack =
- currentAccumulatedSize - aggregatedMetricData.getOrDefault(finishingHashValue - 1, 0L);
+ currentAccumulatedSize - keyToPartitionSizeMap.getOrDefault(finishingKey - 1, 0L);
final Long diffFromIdeal = currentAccumulatedSize - idealAccumulatedSize;
final Long diffFromIdealOneStepBack = idealAccumulatedSize - oneStepBack;
// Go one step back if we came too far.
if (diffFromIdeal > diffFromIdealOneStepBack) {
- finishingHashValue--;
- currentAccumulatedSize -= aggregatedMetricData.getOrDefault(finishingHashValue, 0L);
+ finishingKey--;
+ currentAccumulatedSize -= keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
}
- // assign appropriately
- keyRanges.add(i - 1, HashRange.of(startingHashValue, finishingHashValue));
- startingHashValue = finishingHashValue;
+ boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+ keyRanges.add(i - 1, HashRange.of(startingKey, finishingKey, isSkewedKey));
+ LOG.debug("KeyRange {}~{}, Size {}", startingKey, finishingKey - 1,
+ currentAccumulatedSize - prevAccumulatedSize);
+
+ prevAccumulatedSize = currentAccumulatedSize;
+ startingKey = finishingKey;
} else { // last one: we put the range of the rest.
- keyRanges.add(i - 1, HashRange.of(startingHashValue, maxHashValue + 1));
+ boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+ keyRanges.add(i - 1,
+ HashRange.of(startingKey, maxKey + 1, isSkewedKey));
+
+ while (finishingKey <= maxKey) {
+ currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
+ finishingKey++;
+ }
+ LOG.debug("KeyRange {}~{}, Size {}", startingKey, maxKey + 1,
+ currentAccumulatedSize - prevAccumulatedSize);
}
}
return keyRanges;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 504501b..ad62e68 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -24,8 +24,8 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.runtime.common.data.KeyRange;
import edu.snu.nemo.runtime.common.data.HashRange;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
/**
* Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
@@ -47,7 +47,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
/**
* The list between the task idx and key range to read.
*/
- private List<KeyRange> taskIdxToKeyRange;
+ private Map<Integer, KeyRange> taskIdxToKeyRange;
/**
* Value for {@link DataCommunicationPatternProperty}.
@@ -82,9 +82,9 @@ public final class StageEdge extends RuntimeEdge<Stage> {
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
// Initialize the key range of each dst task.
- this.taskIdxToKeyRange = new ArrayList<>();
+ this.taskIdxToKeyRange = new HashMap<>();
for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
- taskIdxToKeyRange.add(HashRange.of(taskIdx, taskIdx + 1));
+ taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false));
}
this.dataCommunicationPatternValue = edgeProperties.get(DataCommunicationPatternProperty.class)
.orElseThrow(() -> new RuntimeException(String.format(
@@ -122,7 +122,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
/**
* @return the list between the task idx and key range to read.
*/
- public List<KeyRange> getTaskIdxToKeyRange() {
+ public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
return taskIdxToKeyRange;
}
@@ -131,7 +131,7 @@ public final class StageEdge extends RuntimeEdge<Stage> {
*
* @param taskIdxToKeyRange the list to set.
*/
- public void setTaskIdxToKeyRange(final List<KeyRange> taskIdxToKeyRange) {
+ public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> taskIdxToKeyRange) {
this.taskIdxToKeyRange = taskIdxToKeyRange;
}
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index cd2fc16..319b8ba 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -15,6 +15,7 @@
*/
package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
+import edu.snu.nemo.runtime.common.data.HashRange;
import edu.snu.nemo.runtime.common.data.KeyRange;
import org.junit.Before;
import org.junit.Test;
@@ -31,32 +32,41 @@ public class DataSkewRuntimePassTest {
@Before
public void setUp() {
- // Sum is 30 for each hashRanges: 0-3, 3-5, 5-7, 7-9, 9-10.
- buildPartitionSizeList(Arrays.asList(1L, 2L, 4L, 2L, 1L, 8L, 2L, 4L, 2L, 10L));
- buildPartitionSizeList(Arrays.asList(3L, 5L, 5L, 7L, 10L, 3L, 5L, 4L, 8L, 5L));
- buildPartitionSizeList(Arrays.asList(2L, 3L, 5L, 5L, 5L, 6L, 6L, 8L, 4L, 15L));
+ // Skewed partition size lists
+ buildPartitionSizeList(Arrays.asList(5L, 5L, 10L, 50L, 100L));
+ buildPartitionSizeList(Arrays.asList(5L, 10L, 5L, 0L, 0L));
+ buildPartitionSizeList(Arrays.asList(10L, 5L, 5L, 0L, 0L));
}
/**
- * Test if the test case suggested above works correctly.
+ * Test DataSkewRuntimePass whether it redistributes skewed partitions
+ * to partitions with approximate size of (total size / the number of tasks).
*/
@Test
public void testDataSkewDynamicOptimizationPass() {
- final Integer taskListSize = 5;
+ final Integer taskNum = 5;
final List<KeyRange> keyRanges =
- new DataSkewRuntimePass().calculateHashRanges(testMetricData, taskListSize);
+ new DataSkewRuntimePass().setNumSkewedKeys(2).calculateKeyRanges(testMetricData, taskNum);
+ // Test whether it correctly redistributed hash ranges.
assertEquals(0, keyRanges.get(0).rangeBeginInclusive());
- assertEquals(3, keyRanges.get(0).rangeEndExclusive());
- assertEquals(3, keyRanges.get(1).rangeBeginInclusive());
- assertEquals(5, keyRanges.get(1).rangeEndExclusive());
- assertEquals(5, keyRanges.get(2).rangeBeginInclusive());
- assertEquals(7, keyRanges.get(2).rangeEndExclusive());
- assertEquals(7, keyRanges.get(3).rangeBeginInclusive());
- assertEquals(9, keyRanges.get(3).rangeEndExclusive());
- assertEquals(9, keyRanges.get(4).rangeBeginInclusive());
- assertEquals(10, keyRanges.get(4).rangeEndExclusive());
+ assertEquals(2, keyRanges.get(0).rangeEndExclusive());
+ assertEquals(2, keyRanges.get(1).rangeBeginInclusive());
+ assertEquals(3, keyRanges.get(1).rangeEndExclusive());
+ assertEquals(3, keyRanges.get(2).rangeBeginInclusive());
+ assertEquals(4, keyRanges.get(2).rangeEndExclusive());
+ assertEquals(4, keyRanges.get(3).rangeBeginInclusive());
+ assertEquals(5, keyRanges.get(3).rangeEndExclusive());
+ assertEquals(5, keyRanges.get(4).rangeBeginInclusive());
+ assertEquals(5, keyRanges.get(4).rangeEndExclusive());
+
+ // Test whether it caught the provided skewness.
+ assertEquals(false, ((HashRange)keyRanges.get(0)).isSkewed());
+ assertEquals(false, ((HashRange)keyRanges.get(1)).isSkewed());
+ assertEquals(true, ((HashRange)keyRanges.get(2)).isSkewed());
+ assertEquals(true, ((HashRange)keyRanges.get(3)).isSkewed());
+ assertEquals(false, ((HashRange)keyRanges.get(4)).isSkewed());
}
/**
@@ -68,7 +78,11 @@ public class DataSkewRuntimePassTest {
private void buildPartitionSizeList(final List<Long> partitionSizes) {
int key = 0;
for (final long partitionSize : partitionSizes) {
- testMetricData.put(key, partitionSize);
+ if (testMetricData.containsKey(key)) {
+ testMetricData.compute(key, (existingKey, existingValue) -> existingValue + partitionSize);
+ } else {
+ testMetricData.put(key, partitionSize);
+ }
key++;
}
}
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index b9f0c63..2af771e 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -189,9 +189,11 @@ public final class BlockStoreTest {
// Generates the range of hash value to read for each read task.
final int smallDataRangeEnd = 1 + NUM_READ_HASH_TASKS - NUM_WRITE_HASH_TASKS;
- readKeyRangeList.add(HashRange.of(0, smallDataRangeEnd));
+ readKeyRangeList.add(HashRange.of(0, smallDataRangeEnd, false));
IntStream.range(0, NUM_READ_HASH_TASKS - 1).forEach(readTaskIdx -> {
- readKeyRangeList.add(HashRange.of(smallDataRangeEnd + readTaskIdx, smallDataRangeEnd + readTaskIdx + 1));
+ readKeyRangeList.add(HashRange.of(smallDataRangeEnd + readTaskIdx,
+ smallDataRangeEnd + readTaskIdx + 1,
+ false));
});
// Generates the expected result of hash range retrieval for each read task.
@@ -347,7 +349,8 @@ public final class BlockStoreTest {
public Boolean call() {
try {
for (int writeTaskIdx = 0; writeTaskIdx < NUM_WRITE_VERTICES; writeTaskIdx++) {
- readResultCheck(blockIdList.get(writeTaskIdx), HashRange.of(readTaskIdx, readTaskIdx + 1),
+ readResultCheck(blockIdList.get(writeTaskIdx),
+ HashRange.of(readTaskIdx, readTaskIdx + 1, false),
readerSideStore, partitionsPerBlock.get(writeTaskIdx).get(readTaskIdx).getData());
}
return true;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 98928a3..43e46cc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -76,7 +76,6 @@ public final class RuntimeMaster {
private static final int REST_SERVER_PORT = 10101;
private final ExecutorService runtimeMasterThread;
-
private final Scheduler scheduler;
private final ContainerManager containerManager;
private final BlockManagerMaster blockManagerMaster;
@@ -84,22 +83,18 @@ public final class RuntimeMaster {
private final MessageEnvironment masterMessageEnvironment;
private final MetricStore metricStore;
private final Map<Integer, Long> aggregatedMetricData;
+ private final ExecutorService metricAggregationService;
private final ClientRPC clientRPC;
private final MetricManagerMaster metricManagerMaster;
-
// For converting json data. This is a thread safe.
private final ObjectMapper objectMapper;
-
private final String dagDirectory;
private final Set<IRVertex> irVertices;
-
private final AtomicInteger resourceRequestCount;
-
private CountDownLatch metricCountDownLatch;
// REST API server for web metric visualization ui.
private final Server metricServer;
-
@Inject
public RuntimeMaster(final Scheduler scheduler,
final ContainerManager containerManager,
@@ -127,7 +122,8 @@ public final class RuntimeMaster {
this.irVertices = new HashSet<>();
this.resourceRequestCount = new AtomicInteger(0);
this.objectMapper = new ObjectMapper();
- this.aggregatedMetricData = new HashMap<>();
+ this.aggregatedMetricData = new ConcurrentHashMap<>();
+ this.metricAggregationService = Executors.newFixedThreadPool(10);
this.metricStore = MetricStore.getStore();
this.metricServer = startRestMetricServer();
}
@@ -155,7 +151,8 @@ public final class RuntimeMaster {
/**
* Submits the {@link PhysicalPlan} to Runtime.
- * @param plan to execute.
+ *
+ * @param plan to execute
* @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
*/
public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
@@ -172,7 +169,6 @@ public final class RuntimeMaster {
throw new RuntimeException(e);
}
};
-
try {
return runtimeMasterThread.submit(jobExecutionCallable).get();
} catch (Exception e) {
@@ -191,9 +187,7 @@ public final class RuntimeMaster {
} catch (final InterruptedException e) {
LOG.warn("Waiting executor terminating process interrupted.");
}
-
runtimeMasterThread.execute(() -> {
-
scheduler.terminate();
try {
masterMessageEnvironment.close();
@@ -248,22 +242,22 @@ public final class RuntimeMaster {
/**
* Called when a container is allocated for this runtime.
* A wrapper function for {@link ContainerManager}.
- * @param executorId to use for the executor to be launched on this container.
- * @param allocatedEvaluator to be used as the container.
+ *
+ * @param executorId to use for the executor to be launched on this container.
+ * @param allocatedEvaluator to be used as the container.
* @param executorConfiguration to use for the executor to be launched on this container.
*/
public void onContainerAllocated(final String executorId,
final AllocatedEvaluator allocatedEvaluator,
final Configuration executorConfiguration) {
runtimeMasterThread.execute(() -> {
-
containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration);
-
});
}
/**
* Called when an executor is launched on a container for this runtime.
+ *
* @param activeContext of the launched executor.
* @return true if all requested executors have been launched, false otherwise.
*/
@@ -289,6 +283,7 @@ public final class RuntimeMaster {
/**
* Called when an executor fails due to container failure on this runtime.
+ *
* @param failedEvaluator that failed.
*/
public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
@@ -314,9 +309,7 @@ public final class RuntimeMaster {
@Override
public void onMessage(final ControlMessage.Message message) {
runtimeMasterThread.execute(() -> {
-
handleControlMessage(message);
-
});
}
@@ -379,7 +372,6 @@ public final class RuntimeMaster {
}
}
-
/**
* Accumulates the metric data for a barrier vertex.
* TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
@@ -396,22 +388,24 @@ public final class RuntimeMaster {
.filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst()
.orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist in the submitted Physical Plan"));
- // For each hash range index, aggregate the metric data as they arrive.
- partitionSizeInfo.forEach(partitionSizeEntry -> {
- final int key = partitionSizeEntry.getKey();
- final long size = partitionSizeEntry.getSize();
- if (aggregatedMetricData.containsKey(key)) {
- aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size);
- } else {
- aggregatedMetricData.put(key, size);
- }
- });
-
if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) {
final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex =
(MetricCollectionBarrierVertex) vertexToSendMetricDataTo;
+
metricCollectionBarrierVertex.addBlockId(blockId);
- metricCollectionBarrierVertex.setMetricData(aggregatedMetricData);
+ metricAggregationService.submit(() -> {
+ // For each hash range index, we aggregate the metric data.
+ partitionSizeInfo.forEach(partitionSizeEntry -> {
+ final int key = partitionSizeEntry.getKey();
+ final long size = partitionSizeEntry.getSize();
+ if (aggregatedMetricData.containsKey(key)) {
+ aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size);
+ } else {
+ aggregatedMetricData.put(key, size);
+ }
+ });
+ metricCollectionBarrierVertex.setMetricData(aggregatedMetricData);
+ });
} else {
throw new RuntimeException("Something wrong happened at DataSkewCompositePass.");
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index cca0382..e431074 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -25,11 +25,9 @@ import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.driver.context.ActiveContext;
import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
/**
* (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry.
@@ -47,10 +45,10 @@ public final class ExecutorRepresenter {
private final String executorId;
private final ResourceSpecification resourceSpecification;
- private final Set<String> runningTasks;
- private final Map<String, Integer> runningTaskToAttempt;
- private final Set<String> completeTasks;
- private final Set<String> failedTasks;
+ private final Set<Task> runningTasks;
+ private final Map<Task, Integer> runningTaskToAttempt;
+ private final Set<Task> completeTasks;
+ private final Set<Task> failedTasks;
private final MessageSender<ControlMessage.Message> messageSender;
private final ActiveContext activeContext;
private final ExecutorService serializationExecutorService;
@@ -88,7 +86,9 @@ public final class ExecutorRepresenter {
*/
public Set<String> onExecutorFailed() {
failedTasks.addAll(runningTasks);
- final Set<String> snapshot = new HashSet<>(runningTasks);
+ final Set<String> snapshot = runningTasks.stream()
+ .map(Task::getTaskId)
+ .collect(Collectors.toSet());
runningTasks.clear();
return snapshot;
}
@@ -98,9 +98,9 @@ public final class ExecutorRepresenter {
* @param task
*/
public void onTaskScheduled(final Task task) {
- runningTasks.add(task.getTaskId());
- runningTaskToAttempt.put(task.getTaskId(), task.getAttemptIdx());
- failedTasks.remove(task.getTaskId());
+ runningTasks.add(task);
+ runningTaskToAttempt.put(task, task.getAttemptIdx());
+ failedTasks.remove(task);
serializationExecutorService.submit(new Runnable() {
@Override
@@ -133,9 +133,13 @@ public final class ExecutorRepresenter {
*
*/
public void onTaskExecutionComplete(final String taskId) {
- runningTasks.remove(taskId);
- runningTaskToAttempt.remove(taskId);
- completeTasks.add(taskId);
+ Task completedTask = runningTasks.stream()
+ .filter(task -> task.getTaskId().equals(taskId)).findFirst()
+ .orElseThrow(() -> new RuntimeException("Completed task not found in its ExecutorRepresenter"));
+
+ runningTasks.remove(completedTask);
+ runningTaskToAttempt.remove(completedTask);
+ completeTasks.add(completedTask);
}
/**
@@ -143,9 +147,13 @@ public final class ExecutorRepresenter {
* @param taskId id of the Task
*/
public void onTaskExecutionFailed(final String taskId) {
- runningTasks.remove(taskId);
- runningTaskToAttempt.remove(taskId);
- failedTasks.add(taskId);
+ Task failedTask = runningTasks.stream()
+ .filter(task -> task.getTaskId().equals(taskId)).findFirst()
+ .orElseThrow(() -> new RuntimeException("Failed task not found in its ExecutorRepresenter"));
+
+ runningTasks.remove(failedTask);
+ runningTaskToAttempt.remove(failedTask);
+ failedTasks.add(failedTask);
}
/**
@@ -158,11 +166,11 @@ public final class ExecutorRepresenter {
/**
* @return set of ids of Tasks that are running in this executor
*/
- public Set<String> getRunningTasks() {
+ public Set<Task> getRunningTasks() {
return runningTasks;
}
- public Map<String, Integer> getRunningTaskToAttempt() {
+ public Map<Task, Integer> getRunningTaskToAttempt() {
return runningTaskToAttempt;
}
@@ -176,7 +184,7 @@ public final class ExecutorRepresenter {
/**
* @return set of ids of Tasks that have been completed in this executor
*/
- public Set<String> getCompleteTasks() {
+ public Set<Task> getCompleteTasks() {
return completeTasks;
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
index 8f052f0..98af0a1 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -17,6 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
@@ -89,8 +90,10 @@ public final class ExecutorRegistry {
@VisibleForTesting
synchronized Optional<ExecutorRepresenter> findExecutorForTask(final String taskId) {
for (final ExecutorRepresenter executor : getRunningExecutors()) {
- if (executor.getRunningTasks().contains(taskId) || executor.getCompleteTasks().contains(taskId)) {
- return Optional.of(executor);
+ for (final Task runningTask : executor.getRunningTasks()) {
+ if (runningTask.getTaskId().equals(taskId)) {
+ return Optional.of(executor);
+ }
}
}
return Optional.empty();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index e53f659..7023961 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -23,8 +23,6 @@ import org.apache.reef.annotations.audience.DriverSide;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This policy chooses a set of Executors, on which have minimum running Tasks.
@@ -32,7 +30,6 @@ import org.slf4j.LoggerFactory;
@ThreadSafe
@DriverSide
public final class MinOccupancyFirstSchedulingPolicy implements SchedulingPolicy {
- private static final Logger LOG = LoggerFactory.getLogger(MinOccupancyFirstSchedulingPolicy.class.getName());
@VisibleForTesting
@Inject
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 639e775..4a774bf 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -40,11 +40,13 @@ public final class SchedulingConstraintRegistry {
final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint,
final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint,
final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint,
+ final SkewnessAwareSchedulingConstraint skewnessAwareSchedulingConstraint,
final NodeShareSchedulingConstraint nodeShareSchedulingConstraint) {
registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
registerSchedulingConstraint(freeSlotSchedulingConstraint);
registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
registerSchedulingConstraint(nodeShareSchedulingConstraint);
+ registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
}
/**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
new file mode 100644
index 0000000..6ac14cf
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkewnessAwareSchedulingProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+
+/**
+ * This policy aims to distribute partitions with skewed keys to different executors.
+ */
+@ThreadSafe
+@DriverSide
+@AssociatedProperty(SkewnessAwareSchedulingProperty.class)
+public final class SkewnessAwareSchedulingConstraint implements SchedulingConstraint {
+
+ @VisibleForTesting
+ @Inject
+ public SkewnessAwareSchedulingConstraint() {
+ }
+
+ public boolean hasSkewedData(final Task task) {
+ final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
+ for (StageEdge inEdge : task.getTaskIncomingEdges()) {
+ final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx);
+ if (((HashRange) hashRange).isSkewed()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
+ // Check if this executor had already received heavy tasks
+ for (Task runningTask : executor.getRunningTasks()) {
+ if (hasSkewedData(runningTask) && hasSkewedData(task)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f18c900..cb74f05 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -22,8 +22,6 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedu
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
@@ -38,7 +36,6 @@ import java.util.*;
@DriverSide
@AssociatedProperty(SourceLocationAwareSchedulingProperty.class)
public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
- private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingConstraint.class);
@VisibleForTesting
@Inject
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index f2dd878..7449732 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -37,11 +37,17 @@ import static org.mockito.Mockito.*;
@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class FreeSlotSchedulingConstraintTest {
+ private static Task mockTask(final String taskId) {
+ final Task task = mock(Task.class);
+ when(task.getTaskId()).thenReturn(taskId);
+ return task;
+ }
+
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
final int capacity) {
final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
- final Set<String> runningTasks = new HashSet<>();
- IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(String.valueOf(i)));
+ final Set<Task> runningTasks = new HashSet<>();
+ IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
return executorRepresenter;
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
index bf6ebc8..8831e6c 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java
@@ -35,10 +35,16 @@ import static org.mockito.Mockito.*;
@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class MinOccupancyFirstSchedulingPolicyTest {
+ private static Task mockTask(final String taskId) {
+ final Task task = mock(Task.class);
+ when(task.getTaskId()).thenReturn(taskId);
+ return task;
+ }
+
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
- final Set<String> runningTasks = new HashSet<>();
- IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(String.valueOf(i)));
+ final Set<Task> runningTasks = new HashSet<>();
+ IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
return executorRepresenter;
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
new file mode 100644
index 0000000..87c933f
--- /dev/null
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link SkewnessAwareSchedulingConstraint}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, StageEdge.class})
+public final class SkewnessAwareSchedulingConstraintTest {
+
+ private static StageEdge mockStageEdge() {
+ final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
+
+ final HashRange skewedHashRange1 = mock(HashRange.class);
+ when(skewedHashRange1.isSkewed()).thenReturn(true);
+ final HashRange skewedHashRange2 = mock(HashRange.class);
+ when(skewedHashRange2.isSkewed()).thenReturn(true);
+ final HashRange hashRange = mock(HashRange.class);
+ when(hashRange.isSkewed()).thenReturn(false);
+
+ taskIdxToKeyRange.put(0, skewedHashRange1);
+ taskIdxToKeyRange.put(1, skewedHashRange2);
+ taskIdxToKeyRange.put(2, hashRange);
+
+ final StageEdge inEdge = mock(StageEdge.class);
+ when(inEdge.getTaskIdxToKeyRange()).thenReturn(taskIdxToKeyRange);
+
+ return inEdge;
+ }
+
+ private static Task mockTask(final int taskIdx, final List<StageEdge> inEdges) {
+ final Task task = mock(Task.class);
+ when(task.getTaskId()).thenReturn(RuntimeIdGenerator.generateTaskId(taskIdx, "Stage-0"));
+ when(task.getTaskIncomingEdges()).thenReturn(inEdges);
+ return task;
+ }
+
+ private static ExecutorRepresenter mockExecutorRepresenter(final Task task) {
+ final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
+ final Set<Task> runningTasks = new HashSet<>();
+ runningTasks.add(task);
+ when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
+ return executorRepresenter;
+ }
+
+ /**
+ * {@link SkewnessAwareSchedulingConstraint} should schedule Tasks assigned with skewed partitions
+ * to different executors.
+ */
+ @Test
+ public void testScheduleSkewedTasks() {
+ final SchedulingConstraint schedulingConstraint = new SkewnessAwareSchedulingConstraint();
+ final StageEdge inEdge = mockStageEdge();
+ final Task task0 = mockTask(0, Arrays.asList(inEdge));
+ final Task task1 = mockTask(1, Arrays.asList(inEdge));
+ final Task task2 = mockTask(2, Arrays.asList(inEdge));
+ final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);
+
+ assertEquals(true, schedulingConstraint.testSchedulability(e0, task2));
+ assertEquals(false, schedulingConstraint.testSchedulability(e0, task1));
+ }
+}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 772c587..99abf88 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -68,7 +68,8 @@ public final class SourceLocationAwareSchedulingConstraintTest {
}
/**
- * {@link SourceLocationAwareSchedulingConstraint} should properly schedule TGs with multiple source locations.
+ * {@link SourceLocationAwareSchedulingConstraint} should properly schedule {@link Task}s
+ * with multiple source locations.
*/
@Test
public void testSourceLocationAwareSchedulingWithMultiSource() {
diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 0af8c8d..25b26d3 100644
--- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -190,4 +190,3 @@ public final class TestPlanGenerator {
return dagBuilder.buildWithoutSourceSinkCheck();
}
}
-
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
index 7f81154..8345b05 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/composite/DataSkewCompositePassTest.java
@@ -24,6 +24,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.SkewnessAwareSchedulingProperty;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
import edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass;
@@ -103,5 +104,11 @@ public class DataSkewCompositePassTest {
.equals(e.getPropertyValue(MetricCollectionProperty.class)))
.forEach(e -> assertEquals(PartitionerProperty.Value.DataSkewHashPartitioner,
e.getPropertyValue(PartitionerProperty.class).get())));
+
+ processedDAG.filterVertices(v -> v instanceof MetricCollectionBarrierVertex)
+ .forEach(metricV -> {
+ List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
+ reducerV.forEach(rV -> assertTrue(rV.getPropertyValue(SkewnessAwareSchedulingProperty.class).get()));
+ });
}
}