You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/07/12 07:07:33 UTC
[incubator-nemo] branch master updated: [NEMO-141] Make vertices
receiving push edge not comply executor slot (#74)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 0af0383 [NEMO-141] Make vertices receiving push edge not comply executor slot (#74)
0af0383 is described below
commit 0af0383374b7614311663d738f87d5986e7ffc79
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Thu Jul 12 16:07:31 2018 +0900
[NEMO-141] Make vertices receiving push edge not comply executor slot (#74)
JIRA: [NEMO-141: Make vertices receiving push edge not comply executor slot](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-141)
**Major changes:**
- Make relay vertices added by `SailfishPass` not comply executor slot constraint.
- Make `FreeSlotSchedulingConstraint` does not count tasks with `false` slot compliance value while calculating running tasks.
**Minor changes to note:**
- Make `FreeSlotSchedulingConstraint` be constructed through Tang only.
- Make getter methods of `ExecutorRepresenter` return snapshots at that time to avoid synchronization issue. (It might be not enough. Let's check this soon.)
**Tests for the changes:**
- `FreeSlotSchedulingConstraintTest` is updated to handle added cases.
- Reduced the capacity of the single executor in `testSailfishInOneExecutor` to 2. Because the number of relay tasks receiving push edge is 2, this test does not run without the new features added in this pr.
**Other comments:**
- I considered to make all vertices that receive any push edge not comply the slot constraint, but decide to not due to a staging issue. If we make the slot compliance value of a single vertex as false, the vertex becomes a single stage because of different execution property.
resolves [NEMO-141](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-141)
---
.../SailfishVertexExecutorSlotCompliancePass.java | 53 +++++++++++++++++++
.../pass/compiletime/composite/SailfishPass.java | 3 +-
.../beam_sample_one_executor_resources.json | 2 +-
.../master/resource/ExecutorRepresenter.java | 4 +-
.../scheduler/FreeSlotSchedulingConstraint.java | 12 +++--
.../FreeSlotSchedulingConstraintTest.java | 61 ++++++++++++++++++++--
.../compiler/backend/nemo/NemoBackendTest.java | 4 +-
7 files changed, 124 insertions(+), 15 deletions(-)
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
new file mode 100644
index 0000000..4b18885
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+import java.util.Collections;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class SailfishVertexExecutorSlotCompliancePass extends AnnotatingPass {
+
+ public SailfishVertexExecutorSlotCompliancePass() {
+ super(ExecutorSlotComplianceProperty.class, Collections.singleton(DataFlowModelProperty.class));
+ }
+
+ @Override
+ public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+ // On every vertex that receive push edge, if ExecutorSlotComplianceProperty is not set, put it as false.
+ // For other vertices, if ExecutorSlotComplianceProperty is not set, put it as true.
+ dag.getVertices().stream()
+ .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+ .forEach(v -> {
+ if (dag.getIncomingEdgesOf(v).stream().anyMatch(
+ e -> e.getPropertyValue(DataFlowModelProperty.class)
+ .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+ e.getId()))).equals(DataFlowModelProperty.Value.Push))) {
+ v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(false));
+ } else {
+ v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true));
+ }
+ });
+ return dag;
+ }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index d5c50e4..8ffd41f 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -34,7 +34,8 @@ public final class SailfishPass extends CompositePass {
new SailfishEdgeDataStorePass(),
new SailfishEdgeDecoderPass(),
new SailfishEdgeEncoderPass(),
- new SailfishEdgeUsedDataHandlingPass()
+ new SailfishEdgeUsedDataHandlingPass(),
+ new SailfishVertexExecutorSlotCompliancePass()
));
}
}
diff --git a/examples/resources/beam_sample_one_executor_resources.json b/examples/resources/beam_sample_one_executor_resources.json
index 069ed97..4d6aff4 100644
--- a/examples/resources/beam_sample_one_executor_resources.json
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -2,6 +2,6 @@
{
"type": "Transient",
"memory_mb": 512,
- "capacity": 5
+ "capacity": 2
}
]
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e0df766..e78cb90 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -164,10 +164,10 @@ public final class ExecutorRepresenter {
}
/**
- * @return set of ids of Tasks that are running in this executor
+ * @return the current snapshot of set of Tasks that are running in this executor.
*/
public Set<Task> getRunningTasks() {
- return runningTasks;
+ return Collections.unmodifiableSet(new HashSet<>(runningTasks));
}
/**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index 1fc1f6e..f25e4bc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
import edu.snu.nemo.runtime.common.plan.Task;
@@ -28,9 +27,9 @@ import javax.inject.Inject;
*/
@AssociatedProperty(ExecutorSlotComplianceProperty.class)
public final class FreeSlotSchedulingConstraint implements SchedulingConstraint {
- @VisibleForTesting
+
@Inject
- public FreeSlotSchedulingConstraint() {
+ private FreeSlotSchedulingConstraint() {
}
@Override
@@ -39,6 +38,11 @@ public final class FreeSlotSchedulingConstraint implements SchedulingConstraint
return true;
}
- return executor.getRunningTasks().size() < executor.getExecutorCapacity();
+ // Count the number of tasks which are running in this executor and complying the slot constraint.
+ final long numOfComplyingTasks = executor.getRunningTasks().stream()
+ .filter(runningTask -> runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class)
+ .orElseGet(() -> true))
+ .count();
+ return numOfComplyingTasks < executor.getExecutorCapacity();
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index 7449732..a4df785 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -18,6 +18,8 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -36,28 +38,58 @@ import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class FreeSlotSchedulingConstraintTest {
+ private SchedulingConstraint schedulingConstraint;
+ private ExecutorRepresenter a0;
+ private ExecutorRepresenter a1;
+ @Before
+ public void setUp() throws Exception {
+ schedulingConstraint = Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class);
+ a0 = mockExecutorRepresenter(1, 1, 1);
+ a1 = mockExecutorRepresenter(2, 2, 3);
+ }
+
+ /**
+ * Mock a task.
+ *
+ * @param taskId the ID of the task to mock.
+ * @return the mocked task.
+ */
private static Task mockTask(final String taskId) {
final Task task = mock(Task.class);
when(task.getTaskId()).thenReturn(taskId);
return task;
}
- private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
+ /**
+ * Mock an executor representer.
+ *
+ * @param numComplyingTasks the number of already running (mocked) tasks which comply slot constraint in the executor.
+ * @param numIgnoringTasks the number of already running (mocked) tasks which ignore slot constraint in the executor.
+ * @param capacity the capacity of the executor.
+ * @return the mocked executor.
+ */
+ private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks,
+ final int numIgnoringTasks,
final int capacity) {
final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
final Set<Task> runningTasks = new HashSet<>();
- IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
+ IntStream.range(0, numComplyingTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i))));
+ IntStream.range(0, numIgnoringTasks).forEach(i -> {
+ final Task task = mockTask(String.valueOf(numComplyingTasks + i));
+ when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+ runningTasks.add(task);
+ });
when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
return executorRepresenter;
}
+ /**
+ * Test whether the constraint filter full executors.
+ */
@Test
public void testFreeSlot() {
- final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint();
- final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
- final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
final Task task = mock(Task.class);
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
@@ -71,4 +103,23 @@ public final class FreeSlotSchedulingConstraintTest {
final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1);
assertEquals(expectedExecutors, candidateExecutors);
}
+
+ /**
+ * Test whether a task with false compliance property is not filtered by the constraint.
+ */
+ @Test
+ public void testIgnoringSlot() {
+
+ final Task task = mock(Task.class);
+ when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+
+ final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
+
+ final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream()
+ .filter(e -> schedulingConstraint.testSchedulability(e, task))
+ .collect(Collectors.toSet());
+
+ final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0, a1));
+ assertEquals(expectedExecutors, candidateExecutors);
+ }
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index d784795..0f24f55 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -76,7 +76,7 @@ public final class NemoBackendTest<I, O> {
final PhysicalPlan executionPlan = backend.compile(dag, physicalPlanGenerator);
assertEquals(2, executionPlan.getStageDAG().getVertices().size());
- assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskIds().size());
- assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskIds().size());
+ assertEquals(2, executionPlan.getStageDAG().getTopologicalSort().get(0).getIRDAG().getVertices().size());
+ assertEquals(3, executionPlan.getStageDAG().getTopologicalSort().get(1).getIRDAG().getVertices().size());
}
}