You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/01 00:51:52 UTC
[incubator-nemo] branch master updated: [NEMO-232] Implement
InputWatermarkManager (#137)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 7858051 [NEMO-232] Implement InputWatermarkManager (#137)
7858051 is described below
commit 78580514afd4fe6086ebcc4945a519844bf04962
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Thu Nov 1 09:51:48 2018 +0900
[NEMO-232] Implement InputWatermarkManager (#137)
JIRA: [NEMO-232: Implement InputWatermarkManager](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-232)
**Major changes:**
- Tracks the minimum watermark among multiple input streams by creating `InputWatermarkManager`
- Emits watermarks from `OutputCollector` to `InputWatermarkManager`. Then `InputWatermarkManager` emits the minimum input watermarks to next operators
**Minor changes to note:**
- Create `NextOperatorInfo` to wrap edge index, OperatorVertex, and InputWatermarkManager.
**Tests for the changes:**
- Create `InputWatermarkManagerTest`
- Add `testMultipleIncomingEdges` to `TaskExecutorTest`
**Other comments:**
-
Closes #137
---
pom.xml | 16 +-
.../runtime/executor/data/PipeManagerWorker.java | 8 +-
.../datatransfer/DataFetcherOutputCollector.java | 1 +
.../datatransfer/InputWatermarkManager.java | 46 ++++
.../datatransfer/MultiInputWatermarkManager.java | 90 +++++++
.../datatransfer/NextIntraTaskOperatorInfo.java | 59 +++++
.../OperatorVertexOutputCollector.java | 35 ++-
.../datatransfer/SingleInputWatermarkManager.java | 46 ++++
.../nemo/runtime/executor/task/TaskExecutor.java | 73 +++++-
.../datatransfer/InputWatermarkManagerTest.java | 88 +++++++
.../runtime/executor/task/TaskExecutorTest.java | 274 +++++++++++++++------
11 files changed, 623 insertions(+), 113 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1675e9c..3a81df7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,17 +235,17 @@ under the License.
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.0.0</version>
<configuration>
- <excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
- <outputDirectory>docs/apidocs</outputDirectory>
- <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
+ <excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
+ <outputDirectory>docs/apidocs</outputDirectory>
+ <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
</configuration>
<executions>
<execution>
- <id>aggregate</id>
- <goals>
- <goal>aggregate</goal>
- </goals>
- <phase>site</phase>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
</execution>
<execution>
<id>test-javadoc</id>
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
index ffbbe56..a433f3a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -141,8 +141,8 @@ public final class PipeManagerWorker {
/**
* (SYNCHRONIZATION) Called by task threads.
*
- * @param runtimeEdge
- * @param srcTaskIndex
+ * @param runtimeEdge runtime edge
+ * @param srcTaskIndex source task index
* @return output contexts.
*/
public List<ByteOutputContext> getOutputContexts(final RuntimeEdge runtimeEdge,
@@ -163,8 +163,8 @@ public final class PipeManagerWorker {
/**
* (SYNCHRONIZATION) Called by network threads.
*
- * @param outputContext
- * @throws InvalidProtocolBufferException
+ * @param outputContext output context
+ * @throws InvalidProtocolBufferException protobuf exception
*/
public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
final ControlMessage.PipeTransferContextDescriptor descriptor =
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 3f1bc90..56c7540 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -34,6 +34,7 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
/**
* It forwards output to the next operator.
+ * @param nextOperatorVertex next operator to emit data and watermark
*/
public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
this.nextOperatorVertex = nextOperatorVertex;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
new file mode 100644
index 0000000..66fb7aa
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * An interface for tracking input watermarks among multiple input streams.
+ * --edge 1-->
+ * --edge 2--> watermarkManager --(emitWatermark)--> nextOperator
+ * --edge 3-->
+ */
+public interface InputWatermarkManager {
+
+ /**
+ * This tracks the minimum input watermark among multiple input streams.
+ * Ex)
+ * -- input stream1 (edge 1): ---------- ts: 3 ------------------ts: 6
+ * ^^^
+ * emit ts: 4 (edge 2) watermark at this time
+ * -- input stream2 (edge 2): ----------------- ts: 4------
+ * ^^^
+ * emit ts: 3 (edge 1) watermark at this time
+ * -- input stream3 (edge 3): ------- ts: 5 ---------------
+ * @param edgeIndex incoming edge index
+ * @param watermark watermark emitted from the edge
+ */
+ void trackAndEmitWatermarks(int edgeIndex, Watermark watermark);
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
new file mode 100644
index 0000000..91c7c55
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager {
+ private final List<Watermark> watermarks;
+ private final OperatorVertex nextOperator;
+ private int minWatermarkIndex;
+ public MultiInputWatermarkManager(final int numEdges,
+ final OperatorVertex nextOperator) {
+ super();
+ this.watermarks = new ArrayList<>(numEdges);
+ this.nextOperator = nextOperator;
+ this.minWatermarkIndex = 0;
+ // We initialize watermarks as min value because
+ // we should not emit watermark until all edges emit watermarks.
+ for (int i = 0; i < numEdges; i++) {
+ watermarks.add(new Watermark(Long.MIN_VALUE));
+ }
+ }
+
+ private int findNextMinWatermarkIndex() {
+ int index = -1;
+ long timestamp = Long.MAX_VALUE;
+ for (int i = 0; i < watermarks.size(); i++) {
+ if (watermarks.get(i).getTimestamp() < timestamp) {
+ index = i;
+ timestamp = watermarks.get(i).getTimestamp();
+ }
+ }
+ return index;
+ }
+
+ @Override
+ public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
+ if (edgeIndex == minWatermarkIndex) {
+ // update min watermark
+ final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
+ watermarks.set(minWatermarkIndex, watermark);
+ // find min watermark
+ minWatermarkIndex = findNextMinWatermarkIndex();
+ final Watermark minWatermark = watermarks.get(minWatermarkIndex);
+
+ if (minWatermark.getTimestamp() < prevMinWatermark.getTimestamp()) {
+ throw new IllegalStateException(
+ "The current min watermark is ahead of prev min: " + minWatermark + ", " + prevMinWatermark);
+ }
+
+ if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
+ // Watermark timestamp progress!
+ // Emit the min watermark
+ nextOperator.getTransform().onWatermark(minWatermark);
+ }
+ } else {
+ // The recent watermark timestamp cannot be less than the previous one
+ // because watermark is monotonically increasing.
+ if (watermarks.get(edgeIndex).getTimestamp() > watermark.getTimestamp()) {
+ throw new IllegalStateException(
+ "The recent watermark timestamp cannot be less than the previous one "
+ + "because watermark is monotonically increasing.");
+ }
+ watermarks.set(edgeIndex, watermark);
+ }
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
new file mode 100644
index 0000000..1525261
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+
+/**
+ * Contains information for next operator:
+ * -- edgeIndex: the index of edge to next operator.
+ * -- nextOperator: next operator vertex
+ * -- watermarkManager: next operator's watermark manager
+ *
+ * ex)
+ * --edge (index 0)-->
+ * --edge (index 1)--> watermarkManager --> nextOperator
+ * --edge (index 2)-->
+ */
+public final class NextIntraTaskOperatorInfo {
+
+ private final int edgeIndex;
+ private final OperatorVertex nextOperator;
+ private final InputWatermarkManager watermarkManager;
+
+ public NextIntraTaskOperatorInfo(final int edgeIndex,
+ final OperatorVertex nextOperator,
+ final InputWatermarkManager watermarkManager) {
+ this.edgeIndex = edgeIndex;
+ this.nextOperator = nextOperator;
+ this.watermarkManager = watermarkManager;
+ }
+
+ public int getEdgeIndex() {
+ return edgeIndex;
+ }
+
+ public OperatorVertex getNextOperator() {
+ return nextOperator;
+ }
+
+ public InputWatermarkManager getWatermarkManager() {
+ return watermarkManager;
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 598cc35..3637780 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -41,8 +41,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
private final IRVertex irVertex;
- private final List<OperatorVertex> internalMainOutputs;
- private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+ private final List<NextIntraTaskOperatorInfo> internalMainOutputs;
+ private final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputs;
private final List<OutputWriter> externalMainOutputs;
private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
@@ -54,11 +54,12 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
* @param externalMainOutputs external main outputs
* @param externalAdditionalOutputs external additional outputs
*/
- public OperatorVertexOutputCollector(final IRVertex irVertex,
- final List<OperatorVertex> internalMainOutputs,
- final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
- final List<OutputWriter> externalMainOutputs,
- final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+ public OperatorVertexOutputCollector(
+ final IRVertex irVertex,
+ final List<NextIntraTaskOperatorInfo> internalMainOutputs,
+ final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputs,
+ final List<OutputWriter> externalMainOutputs,
+ final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
this.irVertex = irVertex;
this.internalMainOutputs = internalMainOutputs;
this.internalAdditionalOutputs = internalAdditionalOutputs;
@@ -76,8 +77,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emit(final O output) {
- for (final OperatorVertex internalVertex : internalMainOutputs) {
- emit(internalVertex, output);
+ for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
+ emit(internalVertex.getNextOperator(), output);
}
for (final OutputWriter externalWriter : externalMainOutputs) {
@@ -89,8 +90,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
public <T> void emit(final String dstVertexId, final T output) {
if (internalAdditionalOutputs.containsKey(dstVertexId)) {
- for (final OperatorVertex internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
- emit(internalVertex, (O) output);
+ for (final NextIntraTaskOperatorInfo internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
+ emit(internalVertex.getNextOperator(), (O) output);
}
}
@@ -104,15 +105,13 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emitWatermark(final Watermark watermark) {
// Emit watermarks to internal vertices
- // TODO #232: Implement InputWatermarkManager
- // TODO #232: We should emit the minimum watermark among multiple input streams of Transform.
- for (final OperatorVertex internalVertex : internalMainOutputs) {
- internalVertex.getTransform().onWatermark(watermark);
+ for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
+ internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
}
- for (final List<OperatorVertex> internalVertices : internalAdditionalOutputs.values()) {
- for (final OperatorVertex internalVertex : internalVertices) {
- internalVertex.getTransform().onWatermark(watermark);
+ for (final List<NextIntraTaskOperatorInfo> internalVertices : internalAdditionalOutputs.values()) {
+ for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
+ internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
new file mode 100644
index 0000000..204bf22
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * This is a special implementation for single input data stream for optimization.
+ */
+public final class SingleInputWatermarkManager implements InputWatermarkManager {
+
+ private final OperatorVertex nextOperator;
+
+ public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
+ this.nextOperator = nextOperator;
+ }
+
+ /**
+ * This just forwards watermarks to the next operator because it has one data stream.
+ * @param edgeIndex edge index
+ * @param watermark watermark
+ */
+ @Override
+ public void trackAndEmitWatermarks(final int edgeIndex,
+ final Watermark watermark) {
+ nextOperator.getTransform().onWatermark(watermark);
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 1541792..8c92443 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,7 @@ import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdPrope
import org.apache.nemo.common.ir.vertex.*;
import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -154,6 +155,37 @@ public final class TaskExecutor {
// Traverse in a reverse-topological order to ensure that each visited vertex's children vertices exist.
final List<IRVertex> reverseTopologicallySorted = Lists.reverse(irVertexDag.getTopologicalSort());
+ // Build a map for edge as a key and edge index as a value
+ // This variable is used for creating NextIntraTaskOperatorInfo
+ // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+ final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+ reverseTopologicallySorted.forEach(childVertex -> {
+ final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+ for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
+ final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+ edgeIndexMap.putIfAbsent(edge, edgeIndex);
+ }
+ });
+
+ // Build a map for InputWatermarkManager for each operator vertex
+ // This variable is used for creating NextIntraTaskOperatorInfo
+ // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+ final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = new HashMap<>();
+ reverseTopologicallySorted.forEach(childVertex -> {
+
+ if (childVertex instanceof OperatorVertex) {
+ final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+ if (edges.size() == 1) {
+ operatorWatermarkManagerMap.putIfAbsent(childVertex,
+ new SingleInputWatermarkManager((OperatorVertex) childVertex));
+ } else {
+ operatorWatermarkManagerMap.putIfAbsent(childVertex,
+ new MultiInputWatermarkManager(edges.size(), (OperatorVertex) childVertex));
+ }
+ }
+
+ });
+
// Create a harness for each vertex
final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
@@ -165,13 +197,14 @@ public final class TaskExecutor {
}
// Additional outputs
- final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
- getInternalAdditionalOutputMap(irVertex, irVertexDag);
+ final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap =
+ getInternalAdditionalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
// Main outputs
- final List<OperatorVertex> internalMainOutputs = getInternalMainOutputs(irVertex, irVertexDag);
+ final List<NextIntraTaskOperatorInfo> internalMainOutputs =
+ getInternalMainOutputs(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
final List<OutputWriter> externalMainOutputs =
getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
@@ -492,17 +525,25 @@ public final class TaskExecutor {
return map;
}
- private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+ // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+ private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
final IRVertex irVertex,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+ final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+ final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
// Add all intra-task additional tags to additional output map.
- final Map<String, List<OperatorVertex>> map = new HashMap<>();
+ final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
.filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
- .map(edge ->
- Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), (OperatorVertex) edge.getDst()))
+ .map(edge -> {
+ final String outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+ final int index = edgeIndexMap.get(edge);
+ final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+ final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
+ return Pair.of(outputTag, new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager));
+ })
.forEach(pair -> {
map.putIfAbsent(pair.left(), new ArrayList<>());
map.get(pair.left()).add(pair.right());
@@ -511,12 +552,22 @@ public final class TaskExecutor {
return map;
}
- private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
- final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+ // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+ private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
+ final IRVertex irVertex,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+ final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+ final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
+
return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
.stream()
.filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
- .map(edge -> (OperatorVertex) edge.getDst())
+ .map(edge -> {
+ final int index = edgeIndexMap.get(edge);
+ final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+ final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
+ return new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager);
+ })
.collect(Collectors.toList());
}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
new file mode 100644
index 0000000..9303da8
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+ @Test
+ public void test() {
+ final List<Watermark> emittedWatermarks = new LinkedList<>();
+ final Transform transform = mock(Transform.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ final Watermark watermark = invocationOnMock.getArgument(0);
+ emittedWatermarks.add(watermark);
+ return null;
+ }
+ }).when(transform).onWatermark(any(Watermark.class));
+
+ final OperatorVertex operatorVertex = new OperatorVertex(transform);
+ final InputWatermarkManager watermarkManager =
+ new MultiInputWatermarkManager(3, operatorVertex);
+
+ //edge1: 10 s
+ //edge2: 5 s
+ //edge3: 8 s
+ //current min watermark: 5 s
+ watermarkManager.trackAndEmitWatermarks(0, new Watermark(10));
+ assertEquals(0, emittedWatermarks.size());
+ watermarkManager.trackAndEmitWatermarks(1, new Watermark(5));
+ assertEquals(0, emittedWatermarks.size());
+ watermarkManager.trackAndEmitWatermarks(2, new Watermark(8));
+ assertEquals(5, emittedWatermarks.get(0).getTimestamp());
+ emittedWatermarks.clear();
+
+ //edge1: 13
+ //edge2: 9
+ //edge3: 8
+ //current min watermark: 8
+ watermarkManager.trackAndEmitWatermarks(0, new Watermark(13));
+ assertEquals(0, emittedWatermarks.size());
+ watermarkManager.trackAndEmitWatermarks(1, new Watermark(9));
+ assertEquals(8, emittedWatermarks.get(0).getTimestamp());
+ emittedWatermarks.clear();
+
+ //edge1: 13
+ //edge2: 15
+ //edge3: 8
+ //current min watermark: 8
+ watermarkManager.trackAndEmitWatermarks(1, new Watermark(15));
+ assertEquals(0, emittedWatermarks.size());
+
+ //edge1: 13
+ //edge2: 15
+ //edge3: 17
+ //current min watermark: 13
+ watermarkManager.trackAndEmitWatermarks(2, new Watermark(17));
+ assertEquals(13, emittedWatermarks.get(0).getTimestamp());
+ }
+}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 8233663..e05bbfb 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -61,7 +61,10 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -192,78 +195,11 @@ public final class TaskExecutorTest {
*/
@Test()
public void testUnboundedSourceVertexDataFetching() throws Exception {
- final IRVertex sourceIRVertex = new SourceVertex() {
- @Override
- public IRVertex getClone() {
- return this;
- }
-
- @Override
- public boolean isBounded() {
- return false;
- }
-
- @Override
- public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
- return null;
- }
-
- @Override
- public void clearInternalStates() {
-
- }
- };
-
- final long watermark = 1234567L;
-
- final Readable readable = new Readable() {
- int pointer = 0;
- final int middle = elements.size() / 2;
- final int end = elements.size();
- boolean watermarkEmitted = false;
-
- @Override
- public void prepare() {
-
- }
-
- // This emulates unbounded source that throws NoSuchElementException
- // It reads current data until middle point and throws NoSuchElementException at the middle point.
- // It resumes the data reading after emitting a watermark, and finishes at the end of the data.
- @Override
- public Object readCurrent() throws NoSuchElementException {
- if (pointer == middle && !watermarkEmitted) {
- throw new NoSuchElementException();
- }
-
- return elements.get(pointer);
- }
-
- @Override
- public void advance() throws IOException {
- pointer += 1;
- }
-
- @Override
- public long readWatermark() {
- watermarkEmitted = true;
- return watermark;
- }
-
- @Override
- public boolean isFinished() {
- return pointer == end;
- }
-
- @Override
- public List<String> getLocations() throws Exception {
- return null;
- }
-
- @Override
- public void close() throws IOException {
- }
- };
+ final IRVertex sourceIRVertex = new TestUnboundedSourceVertex();
+ final Long watermark = 1234567L;
+ final BlockingQueue<Long> watermarkQueue = new LinkedBlockingQueue<>();
+ watermarkQueue.add(watermark);
+ final Readable readable = new TestUnboundedSourceReadable(watermarkQueue, 1);
final Map<String, Readable> vertexIdToReadable = new HashMap<>();
vertexIdToReadable.put(sourceIRVertex.getId(), readable);
@@ -330,6 +266,108 @@ public final class TaskExecutorTest {
assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
}
+ private void waitUntilWatermarkEmitted(final Queue<Long> watermarkQueue) {
+ while (!watermarkQueue.isEmpty()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * The DAG of the task to test will looks like:
+ * source1 -> vertex1 -> vertex2
+ * source2 -> vertex3 ->
+ *
+ * The vertex2 has two incoming edges (from vertex1 and vertex3)
+ * and we test if TaskExecutor handles data and watermarks correctly in this situation.
+ *
+ * source1 emits watermarks: 500 (ts) 600 (ts) 1400 (ts) 1800 (ts) 2500 (ts)
+ * source2 emits watermarks: 1000(ts) 2200 (ts)
+ *
+ * The vertex2 should receive and emits watermarks 500, 600, 1000, 1800, and 2200
+ */
+ @Test()
+ public void testMultipleIncomingEdges() throws Exception {
+ final List<Watermark> emittedWatermarks = new ArrayList<>();
+ final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
+ final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransformNoWatermarkEmit(emittedWatermarks));
+ final IRVertex operatorIRVertex3 = new OperatorVertex(new RelayTransform());
+
+ final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
+ final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
+
+ final Queue<Long> watermarks1 = new ConcurrentLinkedQueue<>();
+ watermarks1.add(500L);
+ final Queue<Long> watermarks2 = new ConcurrentLinkedQueue<>();
+ watermarks2.add(1000L);
+ final Readable readable1 = new TestUnboundedSourceReadable(watermarks1, 5);
+ final Readable readable2 = new TestUnboundedSourceReadable(watermarks2, 2);
+
+ final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+ vertexIdToReadable.put(sourceIRVertex1.getId(), readable1);
+ vertexIdToReadable.put(sourceIRVertex2.getId(), readable2);
+
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+ new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+ .addVertex(sourceIRVertex1)
+ .addVertex(sourceIRVertex2)
+ .addVertex(operatorIRVertex1)
+ .addVertex(operatorIRVertex2)
+ .addVertex(operatorIRVertex3)
+ .connectVertices(createEdge(sourceIRVertex1, operatorIRVertex1, "edge1"))
+ .connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, "edge2"))
+ .connectVertices(createEdge(sourceIRVertex2, operatorIRVertex3, "edge3"))
+ .connectVertices(createEdge(operatorIRVertex3, operatorIRVertex2, "edge4"))
+ .buildWithoutSourceSinkCheck();
+
+ final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+ final Task task =
+ new Task(
+ "testSourceVertexDataFetching",
+ generateTaskId(),
+ TASK_EXECUTION_PROPERTY_MAP,
+ new byte[0],
+ Collections.emptyList(),
+ Collections.singletonList(taskOutEdge),
+ vertexIdToReadable);
+
+ // Execute the task.
+ final Thread watermarkEmitThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ waitUntilWatermarkEmitted(watermarks2);
+ watermarks1.add(600L);
+ watermarks1.add(1400L);
+ watermarks1.add(1800L);
+ waitUntilWatermarkEmitted(watermarks1);
+ watermarks2.add(2200L);
+ waitUntilWatermarkEmitted(watermarks2);
+ watermarks1.add(2500L);
+ waitUntilWatermarkEmitted(watermarks1);
+ }
+ });
+
+ watermarkEmitThread.start();
+ final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+ taskExecutor.execute();
+
+ watermarkEmitThread.join();
+
+ // Check whether the watermark is emitted
+ assertEquals(Arrays.asList(
+ new Watermark(500), new Watermark(600), new Watermark(1000),
+ new Watermark(1800), new Watermark(2200)), emittedWatermarks);
+
+ // Check the output.
+ final List<Integer> doubledElements = new ArrayList<>(elements.size()*2);
+ doubledElements.addAll(elements);
+ doubledElements.addAll(elements);
+ assertTrue(checkEqualElements(doubledElements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+ }
+
/**
* The DAG of the task to test will looks like:
* parent task -> task (vertex 1 -> task 2) -> child task
@@ -601,6 +639,98 @@ public final class TaskExecutorTest {
}
}
+ /**
+ * Source vertex for unbounded source test.
+ */
+ private final class TestUnboundedSourceVertex extends SourceVertex {
+
+ @Override
+ public boolean isBounded() {
+ return false;
+ }
+
+ @Override
+ public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
+ return null;
+ }
+
+ @Override
+ public void clearInternalStates() {
+
+ }
+
+ @Override
+ public IRVertex getClone() {
+ return null;
+ }
+ }
+
+
+ // This emulates unbounded source that throws NoSuchElementException
+ // It reads current data until middle point and throws NoSuchElementException at the middle point.
+ // It resumes the data reading after emitting a watermark, and finishes at the end of the data.
+ private final class TestUnboundedSourceReadable implements Readable {
+ int pointer = 0;
+ final int middle = elements.size() / 2;
+ final int end = elements.size();
+ final Queue<Long> watermarks;
+ int numEmittedWatermarks = 0;
+ final int expectedNumWatermarks;
+ long currWatermark = -1;
+
+ public TestUnboundedSourceReadable(final Queue<Long> watermarks,
+ final int expectedNumWatermarks) {
+ this.watermarks = watermarks;
+ this.expectedNumWatermarks = expectedNumWatermarks;
+ }
+
+ @Override
+ public void prepare() {
+
+ }
+
+ @Override
+ public Object readCurrent() throws NoSuchElementException {
+ if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
+ throw new NoSuchElementException();
+ }
+ return elements.get(pointer);
+ }
+
+ @Override
+ public void advance() throws IOException {
+ pointer += 1;
+ }
+
+ @Override
+ public long readWatermark() {
+ if (numEmittedWatermarks >= expectedNumWatermarks) {
+ return Long.MAX_VALUE;
+ }
+
+ final Long watermark = watermarks.poll();
+ if (watermark == null) {
+ return currWatermark;
+ }
+ currWatermark = watermark;
+ numEmittedWatermarks += 1;
+ return watermark;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return pointer == end;
+ }
+
+ @Override
+ public List<String> getLocations() throws Exception {
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+ }
/**
* Simple identity function for testing.