You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/02/27 08:51:39 UTC
[incubator-nemo] branch master updated: [NEMO-253] Refactor
getInternal(Main/Additional)OutputMap in TaskExecutor (#196)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new e8de6d4 [NEMO-253] Refactor getInternal(Main/Additional)OutputMap in TaskExecutor (#196)
e8de6d4 is described below
commit e8de6d4a595decd992ed72584e0724fdbd3fa515
Author: apeinot <35...@users.noreply.github.com>
AuthorDate: Wed Feb 27 09:51:34 2019 +0100
[NEMO-253] Refactor getInternal(Main/Additional)OutputMap in TaskExecutor (#196)
JIRA: [NEMO-253: Refactor getInternal(Main/Additional)OutputMap in TaskExecutor](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-253)
**Major changes:**
- Merge methods getInternalMainOutputs and getInternalAdditionalOutputMap from TaskExecutor into one method called getInternalOutputMap
**Minor changes to note:**
- Small change in TaskExecutor::prepare to parse the new method output
**Tests for the changes:**
- All the existing tests still pass
- Add one test which create a TaskExecutor with null inputs to test the exception
Closes #196
---
.../AdditionalOutputTagProperty.java | 8 ++++
.../nemo/runtime/executor/task/TaskExecutor.java | 53 +++++++++++-----------
.../runtime/executor/task/TaskExecutorTest.java | 19 ++++++++
3 files changed, 53 insertions(+), 27 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
index 9d938d1..4c85cec 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/AdditionalOutputTagProperty.java
@@ -41,4 +41,12 @@ public final class AdditionalOutputTagProperty extends EdgeExecutionProperty<Str
public static AdditionalOutputTagProperty of(final String value) {
return new AdditionalOutputTagProperty(value);
}
+
+ // TODO #348: Consider OutputTagProperty
+ private static String mainOutputTag = "main";
+
+ public static String getMainOutputTag() {
+ return mainOutputTag;
+ }
+
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index f1b8e2d..6dee5cf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -213,13 +213,17 @@ public final class TaskExecutor {
// Additional outputs
final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap =
- getInternalAdditionalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
+ getInternalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
// Main outputs
- final List<NextIntraTaskOperatorInfo> internalMainOutputs =
- getInternalMainOutputs(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
+ final List<NextIntraTaskOperatorInfo> internalMainOutputs;
+ if (internalAdditionalOutputMap.containsKey(AdditionalOutputTagProperty.getMainOutputTag())) {
+ internalMainOutputs = internalAdditionalOutputMap.remove(AdditionalOutputTagProperty.getMainOutputTag());
+ } else {
+ internalMainOutputs = new ArrayList<>();
+ }
final List<OutputWriter> externalMainOutputs =
getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
@@ -540,20 +544,34 @@ public final class TaskExecutor {
return map;
}
- // TODO #253: Refactor getInternal(Main/Additional)OutputMap
- private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
+ /**
+ * Return a map of Internal Outputs associated with their output tag.
+ * If an edge has no output tag, its info are added to the mainOutputTag.
+ *
+ * @param irVertex source irVertex
+ * @param irVertexDag DAG of IRVertex and RuntimeEdge
+ * @param edgeIndexMap Map of edge and index
+ * @param operatorWatermarkManagerMap Map of irVertex and InputWatermarkManager
+ * @return Map<OutputTag, List<NextIntraTaskOperatorInfo>>
+ */
+ private Map<String, List<NextIntraTaskOperatorInfo>> getInternalOutputMap(
final IRVertex irVertex,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final Map<Edge, Integer> edgeIndexMap,
final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
- // Add all intra-task additional tags to additional output map.
+ // Add all intra-task tags to additional output map.
final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
- .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
.map(edge -> {
- final String outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+ final boolean isPresent = edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent();
+ final String outputTag;
+ if (isPresent) {
+ outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+ } else {
+ outputTag = AdditionalOutputTagProperty.getMainOutputTag();
+ }
final int index = edgeIndexMap.get(edge);
final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
@@ -567,25 +585,6 @@ public final class TaskExecutor {
return map;
}
- // TODO #253: Refactor getInternal(Main/Additional)OutputMap
- private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
- final IRVertex irVertex,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
- final Map<Edge, Integer> edgeIndexMap,
- final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
-
- return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
- .stream()
- .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
- .map(edge -> {
- final int index = edgeIndexMap.get(edge);
- final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
- final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
- return new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager);
- })
- .collect(Collectors.toList());
- }
-
/**
* Return inter-task OutputWriters, for single output or output associated with main tag.
*
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 838271c..a0027e9 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -70,6 +70,7 @@ import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
@@ -188,6 +189,24 @@ public final class TaskExecutorTest {
assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
+ /**
+ * Test invalid parameter failure.
+ */
+ @Test()
+ public void testInvalidInputData() throws Exception {
+ try{
+ // Execute the task.
+ final TaskExecutor taskExecutor = getTaskExecutor(null, null);
+ taskExecutor.execute();
+
+ // This should not be reached.
+ fail();
+ }
+ catch(NullPointerException e){
+ assertEquals(true, true);
+ }
+ }
+
/**
* This test emits data and watermark by emulating an unbounded source readable.
*/