You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/12 07:07:33 UTC

[incubator-nemo] branch master updated: [NEMO-141] Make vertices receiving push edge not comply executor slot (#74)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0af0383  [NEMO-141] Make vertices receiving push edge not comply executor slot (#74)
0af0383 is described below

commit 0af0383374b7614311663d738f87d5986e7ffc79
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Thu Jul 12 16:07:31 2018 +0900

    [NEMO-141] Make vertices receiving push edge not comply executor slot (#74)
    
    JIRA: [NEMO-141: Make vertices receiving push edge not comply executor slot](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-141)
    
    **Major changes:**
    - Make relay vertices added by `SailfishPass` not comply executor slot constraint.
    - Make `FreeSlotSchedulingConstraint` does not count tasks with `false` slot compliance value while calculating running tasks.
    
    **Minor changes to note:**
    - Make `FreeSlotSchedulingConstraint` be constructed through Tang only.
    - Make getter methods of `ExecutorRepresenter` return snapshots at that time to avoid synchronization issue. (It might be not enough. Let's check this soon.)
    
    **Tests for the changes:**
    - `FreeSlotSchedulingConstraintTest` is updated to handle added cases.
    - Reduced the capacity of the single executor in `testSailfishInOneExecutor` to 2. Because the number of relay tasks receiving push edge is 2, this test does not run without the new features added in this pr.
    
    **Other comments:**
    - I considered to make all vertices that receive any push edge not comply the slot constraint, but decide to not due to a staging issue. If we make the slot compliance value of a single vertex as false, the vertex becomes a single stage because of different execution property.
    
    resolves [NEMO-141](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-141)
---
 .../SailfishVertexExecutorSlotCompliancePass.java  | 53 +++++++++++++++++++
 .../pass/compiletime/composite/SailfishPass.java   |  3 +-
 .../beam_sample_one_executor_resources.json        |  2 +-
 .../master/resource/ExecutorRepresenter.java       |  4 +-
 .../scheduler/FreeSlotSchedulingConstraint.java    | 12 +++--
 .../FreeSlotSchedulingConstraintTest.java          | 61 ++++++++++++++++++++--
 .../compiler/backend/nemo/NemoBackendTest.java     |  4 +-
 7 files changed, 124 insertions(+), 15 deletions(-)

diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
new file mode 100644
index 0000000..4b18885
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+import java.util.Collections;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class SailfishVertexExecutorSlotCompliancePass extends AnnotatingPass {
+
+  public SailfishVertexExecutorSlotCompliancePass() {
+    super(ExecutorSlotComplianceProperty.class, Collections.singleton(DataFlowModelProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex that receive push edge, if ExecutorSlotComplianceProperty is not set, put it as false.
+    // For other vertices, if ExecutorSlotComplianceProperty is not set, put it as true.
+    dag.getVertices().stream()
+        .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+        .forEach(v -> {
+          if (dag.getIncomingEdgesOf(v).stream().anyMatch(
+              e -> e.getPropertyValue(DataFlowModelProperty.class)
+                  .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+                      e.getId()))).equals(DataFlowModelProperty.Value.Push))) {
+            v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(false));
+          } else {
+            v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true));
+          }
+        });
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index d5c50e4..8ffd41f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -34,7 +34,8 @@ public final class SailfishPass extends CompositePass {
         new SailfishEdgeDataStorePass(),
         new SailfishEdgeDecoderPass(),
         new SailfishEdgeEncoderPass(),
-        new SailfishEdgeUsedDataHandlingPass()
+        new SailfishEdgeUsedDataHandlingPass(),
+        new SailfishVertexExecutorSlotCompliancePass()
     ));
   }
 }
diff --git a/examples/resources/beam_sample_one_executor_resources.json b/examples/resources/beam_sample_one_executor_resources.json
index 069ed97..4d6aff4 100644
--- a/examples/resources/beam_sample_one_executor_resources.json
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -2,6 +2,6 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 2
   }
 ]
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e0df766..e78cb90 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -164,10 +164,10 @@ public final class ExecutorRepresenter {
   }
 
   /**
-   * @return set of ids of Tasks that are running in this executor
+   * @return the current snapshot of set of Tasks that are running in this executor.
    */
   public Set<Task> getRunningTasks() {
-    return runningTasks;
+    return Collections.unmodifiableSet(new HashSet<>(runningTasks));
   }
 
   /**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index 1fc1f6e..f25e4bc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
@@ -28,9 +27,9 @@ import javax.inject.Inject;
  */
 @AssociatedProperty(ExecutorSlotComplianceProperty.class)
 public final class FreeSlotSchedulingConstraint implements SchedulingConstraint {
-  @VisibleForTesting
+
   @Inject
-  public FreeSlotSchedulingConstraint() {
+  private FreeSlotSchedulingConstraint() {
   }
 
   @Override
@@ -39,6 +38,11 @@ public final class FreeSlotSchedulingConstraint implements SchedulingConstraint
       return true;
     }
 
-    return executor.getRunningTasks().size() < executor.getExecutorCapacity();
+    // Count the number of tasks which are running in this executor and complying the slot constraint.
+    final long numOfComplyingTasks = executor.getRunningTasks().stream()
+        .filter(runningTask -> runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class)
+            .orElseGet(() -> true))
+        .count();
+    return numOfComplyingTasks < executor.getExecutorCapacity();
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index 7449732..a4df785 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -18,6 +18,8 @@ package edu.snu.nemo.runtime.master.scheduler;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -36,28 +38,58 @@ import static org.mockito.Mockito.*;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class FreeSlotSchedulingConstraintTest {
+  private SchedulingConstraint schedulingConstraint;
+  private ExecutorRepresenter a0;
+  private ExecutorRepresenter a1;
 
+  @Before
+  public void setUp() throws Exception {
+    schedulingConstraint = Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class);
+    a0 = mockExecutorRepresenter(1, 1, 1);
+    a1 = mockExecutorRepresenter(2, 2, 3);
+  }
+
+  /**
+   * Mock a task.
+   *
+   * @param taskId the ID of the task to mock.
+   * @return the mocked task.
+   */
   private static Task mockTask(final String taskId) {
     final Task task = mock(Task.class);
     when(task.getTaskId()).thenReturn(taskId);
     return task;
   }
 
-  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
+  /**
+   * Mock an executor representer.
+   *
+   * @param numComplyingTasks the number of already running (mocked) tasks which comply slot constraint in the executor.
+   * @param numIgnoringTasks  the number of already running (mocked) tasks which ignore slot constraint in the executor.
+   * @param capacity          the capacity of the executor.
+   * @return the mocked executor.
+   */
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks,
+                                                             final int numIgnoringTasks,
                                                              final int capacity) {
     final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
     final Set<Task> runningTasks = new HashSet<>();
-    IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
+    IntStream.range(0, numComplyingTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
+    IntStream.range(0, numIgnoringTasks).forEach(i -> {
+      final Task task = mockTask(String.valueOf(numComplyingTasks + i));
+      when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+      runningTasks.add(task);
+    });
     when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
     when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
     return executorRepresenter;
   }
 
+  /**
+   * Test whether the constraint filter full executors.
+   */
   @Test
   public void testFreeSlot() {
-    final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint();
-    final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
-    final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
     final Task task = mock(Task.class);
     when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
@@ -71,4 +103,23 @@ public final class FreeSlotSchedulingConstraintTest {
     final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1);
     assertEquals(expectedExecutors, candidateExecutors);
   }
+
+  /**
+   * Test whether a task with false compliance property is not filtered by the constraint.
+   */
+  @Test
+  public void testIgnoringSlot() {
+
+    final Task task = mock(Task.class);
+    when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+
+    final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
+
+    final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet());
+
+    final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0, a1));
+    assertEquals(expectedExecutors, candidateExecutors);
+  }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index d784795..0f24f55 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -76,7 +76,7 @@ public final class NemoBackendTest<I, O> {
     final PhysicalPlan executionPlan = backend.compile(dag, physicalPlanGenerator);
 
     assertEquals(2, executionPlan.getStageDAG().getVertices().size());
-    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskIds().size());
-    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskIds().size());
+    assertEquals(2, executionPlan.getStageDAG().getTopologicalSort().get(0).getIRDAG().getVertices().size());
+    assertEquals(3, executionPlan.getStageDAG().getTopologicalSort().get(1).getIRDAG().getVertices().size());
   }
 }