You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/07 13:14:06 UTC

[incubator-nemo] branch master updated: [NEMO-51] Intermediate data location aware scheduling (#86)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f4e7f5  [NEMO-51] Intermediate data location aware scheduling (#86)
8f4e7f5 is described below

commit 8f4e7f5dc6542766d24ce7e88de796f35c3f91fb
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Tue Aug 7 22:14:04 2018 +0900

    [NEMO-51] Intermediate data location aware scheduling (#86)
    
    JIRA: [NEMO-51: Intermediate data location aware scheduling](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-51)
    
    **Major changes:**
    - Implement intermediate data location aware scheduling
    
    **Minor changes to note:**
    - N/A.
    
    **Tests for the changes:**
    - Existing integration tests which testing large shuffle optimization cover this change.
    
    **Other comments:**
    - N/A.
    
    resolves [NEMO-51](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-51)
---
 .../ResourceLocalityProperty.java                  |  3 +-
 .../SourceLocationAwareSchedulingConstraint.java   | 82 +++++++++++++++++-----
 .../SchedulingConstraintnRegistryTest.java         | 14 +++-
 ...ourceLocationAwareSchedulingConstraintTest.java | 16 ++++-
 4 files changed, 93 insertions(+), 22 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
index 869644e..a94f25e 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
@@ -18,7 +18,8 @@ package edu.snu.nemo.common.ir.vertex.executionproperty;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * This property decides whether or not to schedule this vertex only on executors where source data reside.
+ * This property decides whether or not to schedule this vertex only on executors where
+ * source (including intermediate) data reside.
  */
 public final class ResourceLocalityProperty extends VertexExecutionProperty<Boolean> {
   private static final ResourceLocalityProperty SOURCE_TRUE = new ResourceLocalityProperty(true);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f446362..d355aa7 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -16,28 +16,68 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 
 /**
- * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however for Tasks
- * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
- * where the corresponding data reside.
+ * This policy tries to pick the executors where the corresponding source or intermediate data for a task reside.
  */
 @ThreadSafe
 @DriverSide
 @AssociatedProperty(ResourceLocalityProperty.class)
 public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint {
+  private final BlockManagerMaster blockManagerMaster;
 
   @Inject
-  private SourceLocationAwareSchedulingConstraint() {
+  private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster blockManagerMaster) {
+    this.blockManagerMaster = blockManagerMaster;
+  }
+
+  /**
+   * Find the location of the intermediate data for a task.
+   * It is only possible if the task receives only one input edge with One-to-One communication pattern, and
+   * the location of the input data is known.
+   *
+   * @param task the task to schedule.
+   * @return the intermediate data location.
+   */
+  private Optional<String> getIntermediateDataLocation(final Task task) {
+    if (task.getTaskIncomingEdges().size() == 1) {
+      final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
+      if (CommunicationPatternProperty.Value.OneToOne.equals(
+          physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+              .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+        final String blockIdToRead =
+            RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(),
+                RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()));
+        final BlockManagerMaster.BlockLocationRequestHandler locationHandler =
+            blockManagerMaster.getBlockLocationHandler(blockIdToRead);
+        if (locationHandler.getLocationFuture().isDone()) { // if the location is known.
+          try {
+            final String location = locationHandler.getLocationFuture().get();
+            return Optional.of(location);
+          } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          } catch (final ExecutionException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+    return Optional.empty();
   }
 
   /**
@@ -55,19 +95,29 @@ public final class SourceLocationAwareSchedulingConstraint implements Scheduling
 
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
-    final Set<String> sourceLocations;
-    try {
-      sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
-    } catch (final UnsupportedOperationException e) {
-      return true;
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
+    if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+      final Set<String> sourceLocations;
+      try {
+        sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
+      } catch (final UnsupportedOperationException e) {
+        return true;
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    if (sourceLocations.size() == 0) {
-      return true;
-    }
+      if (sourceLocations.size() == 0) {
+        return true;
+      }
 
-    return sourceLocations.contains(executor.getNodeName());
+      return sourceLocations.contains(executor.getNodeName());
+    } else { // Non-source task.
+      final Optional<String> optionalIntermediateLoc = getIntermediateDataLocation(task);
+
+      if (getIntermediateDataLocation(task).isPresent()) {
+        return optionalIntermediateLoc.get().equals(executor.getExecutorId());
+      } else {
+        return true;
+      }
+    }
   }
 }
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index f7fb982..64abefe 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -19,20 +19,30 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests {@link SchedulingConstraintRegistry}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class})
 public final class SchedulingConstraintnRegistryTest {
   @Test
   public void testSchedulingConstraintRegistry() throws InjectionException {
-    final SchedulingConstraintRegistry registry = Tang.Factory.getTang().newInjector()
-        .getInstance(SchedulingConstraintRegistry.class);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+    final SchedulingConstraintRegistry registry =
+        injector.getInstance(SchedulingConstraintRegistry.class);
     assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ResourceSlotProperty.class, registry));
     assertEquals(ContainerTypeAwareSchedulingConstraint.class,
         getConstraintOf(ResourcePriorityProperty.class, registry));
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 1beb8f9..ab1bf0b 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -18,9 +18,12 @@ package edu.snu.nemo.runtime.master.scheduler;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -38,8 +41,9 @@ import static org.mockito.Mockito.*;
  * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, BlockManagerMaster.class})
 public final class SourceLocationAwareSchedulingConstraintTest {
+  private Injector injector;
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
@@ -49,6 +53,12 @@ public final class SourceLocationAwareSchedulingConstraintTest {
     when(executorRepresenter.getNodeName()).thenReturn(executorId);
     return executorRepresenter;
   }
+  
+  @Before
+  public void setUp() throws Exception {
+    injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(BlockManagerMaster.class, mock(BlockManagerMaster.class));
+  }
 
   /**
    * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when
@@ -56,7 +66,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() throws InjectionException {
-    final SchedulingConstraint schedulingConstraint = Tang.Factory.getTang().newInjector()
+    final SchedulingConstraint schedulingConstraint = injector
         .getInstance(SourceLocationAwareSchedulingConstraint.class);
 
     // Prepare test scenario
@@ -76,7 +86,7 @@ public final class SourceLocationAwareSchedulingConstraintTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() throws InjectionException {
-    final SchedulingConstraint schedulingConstraint = Tang.Factory.getTang().newInjector()
+    final SchedulingConstraint schedulingConstraint = injector
         .getInstance(SourceLocationAwareSchedulingConstraint.class);
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(