You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wy...@apache.org on 2019/06/14 13:19:26 UTC

[incubator-nemo] branch master updated: [NEMO-395] Address SonarCloud issues for the scheduler package (#220)

This is an automated email from the ASF dual-hosted git repository.

wylee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new b8a11ce  [NEMO-395] Address SonarCloud issues for the scheduler package (#220)
b8a11ce is described below

commit b8a11ce777eb07d38f6c1f17e7e9598fa04871db
Author: John Yang <jo...@apache.org>
AuthorDate: Fri Jun 14 22:19:21 2019 +0900

    [NEMO-395] Address SonarCloud issues for the scheduler package (#220)
    
    JIRA: [NEMO-395: Address SonarCloud issues for the scheduler package](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-395)
    
    **Major changes:**
    - Fixes all "Critical" issues
    - Fixes most of the "Major" issues
---
 .../AntiAffinitySchedulingConstraint.java          |  4 +-
 .../runtime/master/scheduler/BatchScheduler.java   | 97 ++++++++++++++--------
 .../scheduler/LocalitySchedulingConstraint.java    |  5 +-
 .../scheduler/NodeShareSchedulingConstraint.java   | 23 +++--
 .../runtime/master/scheduler/TaskDispatcher.java   |  2 +-
 5 files changed, 85 insertions(+), 46 deletions(-)

diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
index 93db291..6f29072 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/AntiAffinitySchedulingConstraint.java
@@ -18,7 +18,6 @@
  */
 package org.apache.nemo.runtime.master.scheduler;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.nemo.common.ir.executionproperty.AssociatedProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourceAntiAffinityProperty;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -38,9 +37,8 @@ import java.util.Optional;
 @DriverSide
 @AssociatedProperty(ResourceAntiAffinityProperty.class)
 public final class AntiAffinitySchedulingConstraint implements SchedulingConstraint {
-  @VisibleForTesting
   @Inject
-  public AntiAffinitySchedulingConstraint() {
+  AntiAffinitySchedulingConstraint() {
   }
 
   @Override
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index a37fa52..4d55adeb 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -166,11 +166,15 @@ public final class BatchScheduler implements Scheduler {
   }
 
   private int getMessageId(final Set<StageEdge> stageEdges) {
-    final Set<Integer> messageIds = stageEdges.stream()
-      .map(edge -> edge.getExecutionProperties().get(MessageIdEdgeProperty.class).get())
-      .findFirst().get();
     // Here we simply use findFirst() for now...
     // TODO #345: Simplify insert
+    final Set<Integer> messageIds = stageEdges.stream()
+      .map(edge -> edge.getExecutionProperties()
+        .get(MessageIdEdgeProperty.class)
+        .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
+      .findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
+    // Type casting is needed. See: https://stackoverflow.com/a/40865318
+
     return messageIds.iterator().next();
   }
 
@@ -281,7 +285,7 @@ public final class BatchScheduler implements Scheduler {
 
   @Override
   public void onSpeculativeExecutionCheck() {
-    MutableBoolean isNumOfCloneChanged = new MutableBoolean(false);
+    MutableBoolean isNewCloneCreated = new MutableBoolean(false);
 
     selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
       scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
@@ -290,40 +294,13 @@ public final class BatchScheduler implements Scheduler {
         // Only if the ClonedSchedulingProperty is set...
         stage.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
           if (!cloneConf.isUpFrontCloning()) { // Upfront cloning is already handled.
-            final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
-            final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stageId).toArray();
-
-            // Only after the fraction of the tasks are done...
-            // Delayed cloning (aggressive)
-            if (completedTaskTimes.length > 0
-              && completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
-              Arrays.sort(completedTaskTimes);
-              final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
-              final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
-              final Map<String, Long> execTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stageId);
-              for (final Map.Entry<String, Long> entry : execTaskToTime.entrySet()) {
-
-                // Only if the running task is considered a 'straggler'....
-                final long runningTime = entry.getValue();
-                if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
-                  final String taskId = entry.getKey();
-                  final boolean isCloned = planStateManager.setNumOfClones(
-                    stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
-                  if (isCloned) {
-                    LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
-                        + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, completedTaskTimes.length,
-                      medianTime, medianTimeMultiplier);
-                  }
-                  isNumOfCloneChanged.setValue(isCloned);
-                }
-              }
-            }
+            isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
           }
         });
       });
     });
 
-    if (isNumOfCloneChanged.booleanValue()) {
+    if (isNewCloneCreated.booleanValue()) {
       doSchedule(); // Do schedule the new clone.
     }
   }
@@ -447,6 +424,60 @@ public final class BatchScheduler implements Scheduler {
     return tasks;
   }
 
+  ////////////////////////////////////////////////////////////////////// Task cloning methods.
+
+  /**
+   * @return true if a new clone is created.
+   *         false otherwise.
+   */
+  private boolean doSpeculativeExecution(final Stage stage, final ClonedSchedulingProperty.CloneConf cloneConf) {
+    final double fractionToWaitFor = cloneConf.getFractionToWaitFor();
+    final Object[] completedTaskTimes = planStateManager.getCompletedTaskTimeListMs(stage.getId()).toArray();
+
+    // Only after the fraction of the tasks are done...
+    // Delayed cloning (aggressive)
+    if (completedTaskTimes.length > 0
+      && completedTaskTimes.length >= Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
+      // Only if the running task is considered a 'straggler'....
+      Arrays.sort(completedTaskTimes);
+      final long medianTime = (long) completedTaskTimes[completedTaskTimes.length / 2];
+      final double medianTimeMultiplier = cloneConf.getMedianTimeMultiplier();
+      final Map<String, Long> executingTaskToTime = planStateManager.getExecutingTaskToRunningTimeMs(stage.getId());
+
+      return modifyStageNumCloneUsingMedianTime(
+        stage.getId(), completedTaskTimes.length, medianTime, medianTimeMultiplier, executingTaskToTime);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * @return true if the number of clones for the stage is modified.
+   *         false otherwise.
+   */
+  private boolean modifyStageNumCloneUsingMedianTime(final String stageId,
+                                                     final long numCompletedTasks,
+                                                     final long medianTime,
+                                                     final double medianTimeMultiplier,
+                                                     final Map<String, Long> executingTaskToTime) {
+    for (final Map.Entry<String, Long> entry : executingTaskToTime.entrySet()) {
+      final long runningTime = entry.getValue();
+      if (runningTime > Math.round(medianTime * medianTimeMultiplier)) {
+        final String taskId = entry.getKey();
+        final boolean isNumCloneModified = planStateManager
+          .setNumOfClones(stageId, RuntimeIdManager.getIndexFromTaskId(taskId), 2);
+        if (isNumCloneModified) {
+          LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' "
+              + "(median) {} (ms) * (multiplier) {}", taskId, runningTime, numCompletedTasks,
+            medianTime, medianTimeMultiplier);
+          return true;
+        }
+      }
+    }
+
+    return false;
+  }
+
   ////////////////////////////////////////////////////////////////////// Task state change handlers
 
   /**
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
index 7d4c7bc..a9f9acf 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/LocalitySchedulingConstraint.java
@@ -78,7 +78,10 @@ public final class LocalitySchedulingConstraint implements SchedulingConstraint
           .map(handler -> {
             try {
               return handler.getLocationFuture().get();
-            } catch (InterruptedException | ExecutionException e) {
+            } catch (final ExecutionException e) {
+              throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
               throw new RuntimeException(e);
             }
           })
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index 10fb8de..4654623 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -37,7 +37,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
   private NodeShareSchedulingConstraint() {
   }
 
-  private String getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
+  private Optional<String> getNodeName(final Map<String, Integer> propertyValue, final int taskIndex) {
     final List<String> nodeNames = new ArrayList<>(propertyValue.keySet());
     Collections.sort(nodeNames, Comparator.naturalOrder());
     int index = taskIndex;
@@ -45,10 +45,11 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
       if (index >= propertyValue.get(nodeName)) {
         index -= propertyValue.get(nodeName);
       } else {
-        return nodeName;
+        return Optional.of(nodeName);
       }
     }
-    throw new IllegalStateException("Detected excessive parallelism which ResourceSiteProperty does not cover");
+
+    return Optional.empty();
   }
 
   @Override
@@ -58,11 +59,17 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
     if (propertyValue.isEmpty()) {
       return true;
     }
-    try {
-      return executor.getNodeName().equals(
-        getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId())));
-    } catch (final IllegalStateException e) {
-      throw new RuntimeException(String.format("Cannot schedule %s", task.getTaskId(), e));
+
+    final String executorNodeName = executor.getNodeName();
+    final Optional<String> taskNodeName =
+      getNodeName(propertyValue, RuntimeIdManager.getIndexFromTaskId(task.getTaskId()));
+
+    if (!taskNodeName.isPresent()) {
+      throw new IllegalStateException(
+        String.format("Detected excessive parallelism which ResourceSiteProperty does not cover: %s",
+          task.getTaskId()));
+    } else {
+      return executorNodeName.equals(taskNodeName.get());
     }
   }
 }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index 780e3ca..16a6b35 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -205,7 +205,7 @@ final class TaskDispatcher {
     void await() {
       lock.lock();
       try {
-        if (!hasDelayedSignal) {
+        while (!hasDelayedSignal) { // to handle spurious wakeups
           condition.await();
         }
         hasDelayedSignal = false;