You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wy...@apache.org on 2019/06/14 13:19:26 UTC
[incubator-nemo] branch master updated: [NEMO-395] Address
SonarCloud issues for the scheduler package (#220)
This is an automated email from the ASF dual-hosted git repository.
wylee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new b8a11ce [NEMO-395] Address SonarCloud issues for the scheduler package (#220)
b8a11ce is described below
commit b8a11ce777eb07d38f6c1f17e7e9598fa04871db
Author: John Yang <jo...@apache.org>
AuthorDate: Fri Jun 14 22:19:21 2019 +0900
[NEMO-395] Address SonarCloud issues for the scheduler package (#220)
JIRA: [NEMO-395: Address SonarCloud issues for the scheduler package](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-395)
**Major changes:**
- Fixes all "Critical" issues
- Fixes most of the "Major" issues
---
.../AntiAffinitySchedulingConstraint.java | 4 +-
.../runtime/master/scheduler/BatchScheduler.java | 97 ++++++++++++++--------
.../scheduler/LocalitySchedulingConstraint.java | 5 +-
.../scheduler/NodeShareSchedulingConstraint.java | 23 +++--
.../runtime/master/scheduler/TaskDispatcher.java | 2 +-
5 files changed, 85 insertions(+), 46 deletions(-)
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
index 93db291..6f29072 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
@@ -18,7 +18,6 @@
*/
package org.apache.nemo.runtime.master.scheduler;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.nemo.common.ir.executionproperty.AssociatedProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceAntiAffinityProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -38,9 +37,8 @@ import java.util.Optional;
@DriverSide
@AssociatedProperty(ResourceAntiAffinityProperty.class)
public final class AntiAffinitySchedulingConstraint implements SchedulingConstraint {
- @VisibleForTesting
@Inject
- public AntiAffinitySchedulingConstraint() {
+ AntiAffinitySchedulingConstraint() {
}
@Override
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index a37fa52..4d55adeb 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -166,11 +166,15 @@ public final class BatchScheduler implements Scheduler {
}
private int getMessageId(final Set<StageEdge> stageEdges) {
- final Set<Integer> messageIds = stageEdges.stream()
- .map(edge -> edge.getExecutionProperties().get(MessageIdEdgeProperty.class).get())
- .findFirst().get();
// Here we simply use findFirst() for now...
// TODO #345: Simplify insert
+ final Set<Integer> messageIds = stageEdges.stream()
+ .map(edge -> edge.getExecutionProperties()
+ .get(MessageIdEdgeProperty.class)
+ .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
+ .findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
+ // Type casting is needed. See: https://stackoverflow.com/a/40865318
+
return messageIds.iterator().next();
}
@@ -281,7 +285,7 @@ public final class BatchScheduler implements Scheduler {
@Override
public void onSpeculativeExecutionCheck() {
- MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
+ MutableBoolean isNewCloneCreated = new MutableBoolean(false);
selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
@@ -290,40 +294,13 @@ public final class BatchScheduler implements Scheduler {
// Only if the ClonedSchedulingProperty is set...
stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
- final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
- final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
-
- // Only after the fraction of the tasks are done...
- // Delayed cloning (aggressive)
- if (completedTaskTimes.length > 0
- && completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
- Arrays.sort(completedTaskTimes);
- final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
- final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
- final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
- for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
-
- // Only if the running task is considered a 'straggler'....
- final long runningTime = entry.getValue();
- if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
- final String taskId = entry.getKey();
- final boolean isCloned = planStateManager.setNumOfClones(
- stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
- if (isCloned) {
- LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
- + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
- medianTime, medianTimeMultiplier);
- }
- isNumOfCloneChanged.setValue(isCloned);
- }
- }
- }
+ isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
});
});
- if (isNumOfCloneChanged.booleanValue()) {
+ if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
}
}
@@ -447,6 +424,60 @@ public final class BatchScheduler implements Scheduler {
return tasks;
}
+ ////////////////////////////////////////////////////////////////////// Task cloning methods.
+
+ /**
+ * @return true if a new clone is created.
+ * false otherwise.
+ */
+ private boolean doSpeculativeExecution(final Stage stage, final ClonedSchedulingProperty.CloneConf cloneConf) {
+ final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
+ final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stage.getId()).toArray();
+
+ // Only after the fraction of the tasks are done...
+ // Delayed cloning (aggressive)
+ if (completedTaskTimes.length > 0
+ && completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
+ // Only if the running task is considered a 'straggler'....
+ Arrays.sort(completedTaskTimes);
+ final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
+ final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
+ final Map<String, Long> executingTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stage.getId());
+
+ return modifyStageNumCloneUsingMedianTime(
+ stage.getId(), completedTaskTimes.length, medianTime, medianTimeMultiplier, executingTaskToTime);
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @return true if the number of clones for the stage is modified.
+ * false otherwise.
+ */
+ private boolean modifyStageNumCloneUsingMedianTime(final String stageId,
+ final long numCompletedTasks,
+ final long medianTime,
+ final double medianTimeMultiplier,
+ final Map<String, Long> executingTaskToTime) {
+ for (final Map.Entry<String, Long> entry : executingTaskToTime.entrySet()) {
+ final long runningTime = entry.getValue();
+ if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
+ final String taskId = entry.getKey();
+ final boolean isNumCloneModified = planStateManager
+ .setNumOfClones(stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
+ if (isNumCloneModified) {
+ LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+ + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, numCompletedTasks,
+ medianTime, medianTimeMultiplier);
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
////////////////////////////////////////////////////////////////////// Task state change handlers
/**
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
index 7d4c7bc..a9f9acf 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
@@ -78,7 +78,10 @@ public final class LocalitySchedulingConstraint implements SchedulingConstraint
.map(handler -> {
try {
return handler.getLocationFuture().get();
- } catch (InterruptedException | ExecutionException e) {
+ } catch (final ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
})
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index 10fb8de..4654623 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -37,7 +37,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
private NodeShareSchedulingConstraint() {
}
- private String getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
+ private Optional<String> getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
Collections.sort(nodeNames, Comparator.naturalOrder());
int index = taskIndex;
@@ -45,10 +45,11 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
if (index >= propertyValue.get(nodeName)) {
index -= propertyValue.get(nodeName);
} else {
- return nodeName;
+ return Optional.of(nodeName);
}
}
- throw new IllegalStateException("Detected excessive parallelism which ResourceSiteProperty does not cover");
+
+ return Optional.empty();
}
@Override
@@ -58,11 +59,17 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
if (propertyValue.isEmpty()) {
return true;
}
- try {
- return executor.getNodeName().equals(
- getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId())));
- } catch (final IllegalStateException e) {
- throw new RuntimeException(String.format("Cannot schedule %s", task.getTaskId(), e));
+
+ final String executorNodeName = executor.getNodeName();
+ final Optional<String> taskNodeName =
+ getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId()));
+
+ if (!taskNodeName.isPresent()) {
+ throw new IllegalStateException(
+ String.format("Detected excessive parallelism which ResourceSiteProperty does not cover: %s",
+ task.getTaskId()));
+ } else {
+ return executorNodeName.equals(taskNodeName.get());
}
}
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index 780e3ca..16a6b35 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -205,7 +205,7 @@ final class TaskDispatcher {
void await() {
lock.lock();
try {
- if (!hasDelayedSignal) {
+ while (!hasDelayedSignal) { // to handle spurious wakeups
condition.await();
}
hasDelayedSignal = false;