You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/02/27 08:51:39 UTC

[incubator-nemo] branch master updated: [NEMO-253] Refactor getInternal(Main/Additional)OutputMap in TaskExecutor (#196)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new e8de6d4  [NEMO-253] Refactor getInternal(Main/Additional)OutputMap in TaskExecutor (#196)
e8de6d4 is described below

commit e8de6d4a595decd992ed72584e0724fdbd3fa515
Author: apeinot <35...@users.noreply.github.com>
AuthorDate: Wed Feb 27 09:51:34 2019 +0100

    [NEMO-253] Refactor getInternal(Main/Additional)OutputMap in TaskExecutor (#196)
    
    JIRA: [NEMO-253: Refactor getInternal(Main/Additional)OutputMap in TaskExecutor](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-253)
    
    **Major changes:**
    - Merge methods getInternalMainOutputs and getInternalAdditionalOutputMap from TaskExecutor into one method called getInternalOutputMap
    
    **Minor changes to note:**
    - Small change in TaskExecutor::prepare to parse the new method output
    
    **Tests for the changes:**
    - All the existing tests still pass
    - Add one test which create a TaskExecutor with null inputs to test the exception
    
    Closes #196
---
 .../AdditionalOutputTagProperty.java               |  8 ++++
 .../nemo/runtime/executor/task/TaskExecutor.java   | 53 +++++++++++-----------
 .../runtime/executor/task/TaskExecutorTest.java    | 19 ++++++++
 3 files changed, 53 insertions(+), 27 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
index 9d938d1..4c85cec 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
@@ -41,4 +41,12 @@ public final class AdditionalOutputTagProperty extends EdgeExecutionProperty<Str
   public static AdditionalOutputTagProperty of(final String value) {
     return new AdditionalOutputTagProperty(value);
   }
+
+  // TODO #348: Consider OutputTagProperty
+  private static String mainOutputTag = "main";
+
+  public static String getMainOutputTag() {
+    return mainOutputTag;
+  }
+
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index f1b8e2d..6dee5cf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -213,13 +213,17 @@ public final class TaskExecutor {
 
       // Additional outputs
       final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap =
-        getInternalAdditionalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
+        getInternalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
       final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
         getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
       // Main outputs
-      final List<NextIntraTaskOperatorInfo> internalMainOutputs =
-        getInternalMainOutputs(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
+      final List<NextIntraTaskOperatorInfo> internalMainOutputs;
+      if (internalAdditionalOutputMap.containsKey(AdditionalOutputTagProperty.getMainOutputTag())) {
+        internalMainOutputs = internalAdditionalOutputMap.remove(AdditionalOutputTagProperty.getMainOutputTag());
+      } else {
+        internalMainOutputs = new ArrayList<>();
+      }
       final List<OutputWriter> externalMainOutputs =
         getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
@@ -540,20 +544,34 @@ public final class TaskExecutor {
     return map;
   }
 
-  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
-  private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
+  /**
+   * Return a map of Internal Outputs associated with their output tag.
+   * If an edge has no output tag, its info are added to the mainOutputTag.
+   *
+   * @param irVertex source irVertex
+   * @param irVertexDag DAG of IRVertex and RuntimeEdge
+   * @param edgeIndexMap Map of edge and index
+   * @param operatorWatermarkManagerMap Map of irVertex and InputWatermarkManager
+   * @return Map<OutputTag, List<NextIntraTaskOperatorInfo>>
+   */
+  private Map<String, List<NextIntraTaskOperatorInfo>> getInternalOutputMap(
     final IRVertex irVertex,
     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
     final Map<Edge, Integer> edgeIndexMap,
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
-    // Add all intra-task additional tags to additional output map.
+    // Add all intra-task tags to additional output map.
     final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
 
     irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
-      .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(edge -> {
-          final String outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+          final boolean isPresent = edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
+          final String outputTag;
+          if (isPresent) {
+            outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+          } else {
+            outputTag = AdditionalOutputTagProperty.getMainOutputTag();
+          }
           final int index = edgeIndexMap.get(edge);
           final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
           final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
@@ -567,25 +585,6 @@ public final class TaskExecutor {
     return map;
   }
 
-  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
-  private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
-    final IRVertex irVertex,
-    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-    final Map<Edge, Integer> edgeIndexMap,
-    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
-
-    return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
-      .stream()
-      .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge -> {
-        final int index = edgeIndexMap.get(edge);
-        final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
-        final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
-        return new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager);
-      })
-      .collect(Collectors.toList());
-  }
-
   /**
    * Return inter-task OutputWriters, for single output or output associated with main tag.
    *
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 838271c..a0027e9 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -70,6 +70,7 @@ import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Matchers.any;
@@ -188,6 +189,24 @@ public final class TaskExecutorTest {
     assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
   }
 
+    /**
+   * Test invalid parameter failure.
+   */
+  @Test()
+  public void testInvalidInputData() throws Exception {
+    try{
+      // Execute the task.
+      final TaskExecutor taskExecutor = getTaskExecutor(null, null);
+      taskExecutor.execute();
+
+      // This should not be reached.
+      fail();
+    }
+    catch(NullPointerException e){
+      assertEquals(true, true);
+    }
+  }
+
   /**
    * This test emits data and watermark by emulating an unbounded source readable.
    */