You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/07 13:14:06 UTC
[incubator-nemo] branch master updated: [NEMO-51] Intermediate data
location aware scheduling (#86)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 8f4e7f5 [NEMO-51] Intermediate data location aware scheduling (#86)
8f4e7f5 is described below
commit 8f4e7f5dc6542766d24ce7e88de796f35c3f91fb
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Tue Aug 7 22:14:04 2018 +0900
[NEMO-51] Intermediate data location aware scheduling (#86)
JIRA: [NEMO-51: Intermediate data location aware scheduling](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-51)
**Major changes:**
- Implement intermediate data location aware scheduling
**Minor changes to note:**
- N/A.
**Tests for the changes:**
- Existing integration tests which testing large shuffle optimization cover this change.
**Other comments:**
- N/A.
resolves [NEMO-51](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-51)
---
.../ResourceLocalityProperty.java | 3 +-
.../SourceLocationAwareSchedulingConstraint.java | 82 +++++++++++++++++-----
.../SchedulingConstraintnRegistryTest.java | 14 +++-
...ourceLocationAwareSchedulingConstraintTest.java | 16 ++++-
4 files changed, 93 insertions(+), 22 deletions(-)
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
index 869644e..a94f25e 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
@@ -18,7 +18,8 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
/**
- * This property decides whether or not to schedule this vertex only on executors where source data reside.
+ * This property decides whether or not to schedule this vertex only on executors where
+ * source (including intermediate) data reside.
*/
public final class ResourceLocalityProperty extends VertexExecutionProperty<Boolean> {
private static final ResourceLocalityProperty SOURCE_TRUE = new ResourceLocalityProperty(true);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f446362..d355aa7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -16,28 +16,68 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
+import java.util.concurrent.ExecutionException;
/**
- * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however for Tasks
- * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
- * where the corresponding data reside.
+ * This policy tries to pick the executors where the corresponding source or intermediate data for a task reside.
*/
@ThreadSafe
@DriverSide
@AssociatedProperty(ResourceLocalityProperty.class)
public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+ private final BlockManagerMaster blockManagerMaster;
@Inject
- private SourceLocationAwareSchedulingConstraint() {
+ private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
+ this.blockManagerMaster = blockManagerMaster;
+ }
+
+ /**
+ * Find the location of the intermediate data for a task.
+ * It is only possible if the task receives only one input edge with One-to-One communication pattern, and
+ * the location of the input data is known.
+ *
+ * @param task the task to schedule.
+ * @return the intermediate data location.
+ */
+ private Optional<String> getIntermediateDataLocation(final Task task) {
+ if (task.getTaskIncomingEdges().size() == 1) {
+ final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
+ if (CommunicationPatternProperty.Value.OneToOne.equals(
+ physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+ .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+ final String blockIdToRead =
+ RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(),
+ RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()));
+ final BlockManagerMaster.BlockLocationRequestHandler locationHandler =
+ blockManagerMaster.getBlockLocationHandler(blockIdToRead);
+ if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
+ try {
+ final String location = locationHandler.getLocationFuture().get();
+ return Optional.of(location);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ return Optional.empty();
}
/**
@@ -55,19 +95,29 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
@Override
public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
- final Set<String> sourceLocations;
- try {
- sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
- } catch (final UnsupportedOperationException e) {
- return true;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
+ if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+ final Set<String> sourceLocations;
+ try {
+ sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- if (sourceLocations.size() == 0) {
- return true;
- }
+ if (sourceLocations.size() == 0) {
+ return true;
+ }
- return sourceLocations.contains(executor.getNodeName());
+ return sourceLocations.contains(executor.getNodeName());
+ } else { // Non-source task.
+ final Optional<String> optionalIntermediateLoc = getIntermediateDataLocation(task);
+
+ if (getIntermediateDataLocation(task).isPresent()) {
+ return optionalIntermediateLoc.get().equals(executor.getExecutorId());
+ } else {
+ return true;
+ }
+ }
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index f7fb982..64abefe 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -19,20 +19,30 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
/**
* Tests {@link SchedulingConstraintRegistry}.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class})
public final class SchedulingConstraintnRegistryTest {
@Test
public void testSchedulingConstraintRegistry() throws InjectionException {
- final SchedulingConstraintRegistry registry = Tang.Factory.getTang().newInjector()
- .getInstance(SchedulingConstraintRegistry.class);
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+ final SchedulingConstraintRegistry registry =
+ injector.getInstance(SchedulingConstraintRegistry.class);
assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ResourceSlotProperty.class, registry));
assertEquals(ContainerTypeAwareSchedulingConstraint.class,
getConstraintOf(ResourcePriorityProperty.class, registry));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 1beb8f9..ab1bf0b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -18,9 +18,12 @@ package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -38,8 +41,9 @@ import static org.mockito.Mockito.*;
* Test cases for {@link SourceLocationAwareSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, BlockManagerMaster.class})
public final class SourceLocationAwareSchedulingConstraintTest {
+ private Injector injector;
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
private static final String SITE_2 = "BUSAN";
@@ -49,6 +53,12 @@ public final class SourceLocationAwareSchedulingConstraintTest {
when(executorRepresenter.getNodeName()).thenReturn(executorId);
return executorRepresenter;
}
+
+ @Before
+ public void setUp() throws Exception {
+ injector = Tang.Factory.getTang().newInjector();
+ injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+ }
/**
* {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
@@ -56,7 +66,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
*/
@Test
public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionException {
- final SchedulingConstraint schedulingConstraint = Tang.Factory.getTang().newInjector()
+ final SchedulingConstraint schedulingConstraint = injector
.getInstance(SourceLocationAwareSchedulingConstraint.class);
// Prepare test scenario
@@ -76,7 +86,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
*/
@Test
public void testSourceLocationAwareSchedulingWithMultiSource() throws InjectionException {
- final SchedulingConstraint schedulingConstraint = Tang.Factory.getTang().newInjector()
+ final SchedulingConstraint schedulingConstraint = injector
.getInstance(SourceLocationAwareSchedulingConstraint.class);
// Prepare test scenario
final Task task0 = CreateTask.withReadablesWithSourceLocations(