You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/13 04:30:42 UTC
[incubator-nemo] branch master updated: [NEMO-245,
247] Handle watermark in OutputWriter and Implement unbounded word
count example (#153)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new dc3519f [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example (#153)
dc3519f is described below
commit dc3519f55a7668cefc6b19a3606bf47f81bda9f3
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Nov 13 13:30:36 2018 +0900
[NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example (#153)
JIRA: [NEMO-245: Handle watermark in OutputWriter](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-245)
[NEMO-247: UnboundedSource WindowedWordCount ITCase](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-247)
**Major changes:**
- Add `NemoEventDecoder/Encoder` to encode/decode watermarks
- Handle inter-task watermarks at `MultiThreadParentDataFetcher`
- Modify `OutputWriter` interface to emit watermark (`emitWatermark`)
- Refactor `WindowedWordCountExample` to run this example with unbounded source
**Tests for the changes:**
- unbounded windowed word count (but disabled because we cannot force close nemo job in test)
---
.../nemo/common/coder/BytesDecoderFactory.java | 6 +-
.../nemo/common/coder/BytesEncoderFactory.java | 4 +
.../ir/edge/executionproperty/DecoderProperty.java | 1 +
.../ir/edge/executionproperty/EncoderProperty.java | 1 +
.../common/ir/vertex/transform/RelayTransform.java | 3 +
.../frontend/beam/coder/BeamDecoderFactory.java | 5 ++
.../frontend/beam/coder/BeamEncoderFactory.java | 3 +
.../beam/transform/AbstractDoFnTransform.java | 6 +-
.../frontend/beam/transform/DoFnTransform.java | 3 +
.../reshaping/LargeShuffleRelayReshapingPass.java | 6 +-
.../GroupByKeyAndWindowDoFnTransformTest.java | 2 -
.../nemo/examples/beam/WindowedWordCount.java | 99 ++++++++++++++--------
.../examples/beam/WindowedWordCountITCase.java | 32 ++++++-
.../org/apache/nemo/runtime/executor/Executor.java | 49 +++++++++--
.../executor/bytetransfer/ByteOutputContext.java | 3 +
.../nemo/runtime/executor/data/DataUtil.java | 2 +
.../data/partition/SerializedPartition.java | 1 +
.../executor/datatransfer/BlockOutputWriter.java | 10 +++
.../datatransfer/DataFetcherOutputCollector.java | 10 ++-
.../datatransfer/InputWatermarkManager.java | 2 +
.../datatransfer/MultiInputWatermarkManager.java | 23 +++--
.../datatransfer/NemoEventDecoderFactory.java | 93 ++++++++++++++++++++
.../datatransfer/NemoEventEncoderFactory.java | 72 ++++++++++++++++
.../OperatorVertexOutputCollector.java | 18 +++-
...lector.java => OperatorWatermarkCollector.java} | 30 +++----
.../executor/datatransfer/OutputWriter.java | 7 ++
.../executor/datatransfer/PipeOutputWriter.java | 52 ++++++++----
.../datatransfer/SingleInputWatermarkManager.java | 10 +--
...termarkManager.java => WatermarkWithIndex.java} | 36 +++++---
.../task/MultiThreadParentTaskDataFetcher.java | 52 +++++++++++-
.../executor/task/SourceVertexDataFetcher.java | 4 +
.../nemo/runtime/executor/task/TaskExecutor.java | 72 +++++++++++-----
.../datatransfer/InputWatermarkManagerTest.java | 2 +-
.../master/resource/ExecutorRepresenter.java | 26 +++---
34 files changed, 597 insertions(+), 148 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 279f7c5..1bb8185 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -19,7 +19,10 @@
package org.apache.nemo.common.coder;
import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -27,6 +30,7 @@ import java.io.InputStream;
* A {@link DecoderFactory} which is used for an array of bytes.
*/
public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
+ private static final Logger LOG = LoggerFactory.getLogger(BytesDecoderFactory.class.getName());
private static final BytesDecoderFactory BYTES_DECODER_FACTORY = new BytesDecoderFactory();
@@ -84,7 +88,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
returnedArray = true;
return new byte[0];
} else {
- throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
+ throw new EOFException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
}
}
final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array.
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
index 140f5ea..3a5af26 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
@@ -18,12 +18,16 @@
*/
package org.apache.nemo.common.coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.*;
/**
* A {@link EncoderFactory} which is used for an array of bytes.
*/
public final class BytesEncoderFactory implements EncoderFactory<byte[]> {
+ private static final Logger LOG = LoggerFactory.getLogger(BytesEncoderFactory.class.getName());
private static final BytesEncoderFactory BYTES_ENCODER_FACTORY = new BytesEncoderFactory();
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
index bf6e908..32406c9 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
/**
* Decoder ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
*/
public final class DecoderProperty extends EdgeExecutionProperty<DecoderFactory> {
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
index cf931fc..8e6385d 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
/**
* EncoderFactory ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
*/
public final class EncoderProperty extends EdgeExecutionProperty<EncoderFactory> {
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b0dbe54..cd713d3 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -20,6 +20,8 @@ package org.apache.nemo.common.ir.vertex.transform;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link Transform} relays input data from upstream vertex to downstream vertex promptly.
@@ -28,6 +30,7 @@ import org.apache.nemo.common.punctuation.Watermark;
*/
public final class RelayTransform<T> implements Transform<T, T> {
private OutputCollector<T> outputCollector;
+ private static final Logger LOG = LoggerFactory.getLogger(RelayTransform.class.getName());
/**
* Default constructor.
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
index 4cfefe8..c1ff6f0 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -119,6 +119,11 @@ public final class BeamDecoderFactory<T> implements DecoderFactory<T> {
public T2 decode() throws IOException {
return decodeInternal();
}
+
+ @Override
+ public String toString() {
+ return "BeamDecoder: {" + beamCoder.toString() + "}";
+ }
}
/**
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
index e46f5b0..090c24b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -22,6 +22,8 @@ import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
@@ -31,6 +33,7 @@ import java.io.OutputStream;
* @param <T> the type of element to encode.
*/
public final class BeamEncoderFactory<T> implements EncoderFactory<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamEncoderFactory.class.getName());
private final Coder<T> beamCoder;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 6a8f8d4..dd5ca35 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -65,9 +65,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
private transient DoFnRunners.OutputManager outputManager;
- // For bundle
- // we consider count and time millis for start/finish bundle
- // if # of processed elements > bundleSize
+ // Variables for bundle.
+ // We consider count and time millis for start/finish bundle.
+ // If # of processed elements > bundleSize
// or elapsed time > bundleMillis, we finish the current bundle and start a new one
private transient long bundleSize;
private transient long bundleMillis;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 18368c6..9f7a4e0 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.List;
@@ -39,6 +41,7 @@ import java.util.Map;
* @param <OutputT> output type.
*/
public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
+ private static final Logger LOG = LoggerFactory.getLogger(DoFnTransform.class.getName());
/**
* DoFnTransform Constructor.
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index 8499225..c34615e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -53,11 +53,11 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
// We care about OperatorVertices that have any incoming edge that
// has Shuffle as data communication pattern.
if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
- CommunicationPatternProperty.Value.Shuffle
+ CommunicationPatternProperty.Value.Shuffle
.equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
dag.getIncomingEdgesOf(v).forEach(edge -> {
if (CommunicationPatternProperty.Value.Shuffle
- .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+ .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
// Insert a merger vertex having transform that write received data immediately
// before the vertex receiving shuffled data.
final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
@@ -67,7 +67,7 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
new IREdge(CommunicationPatternProperty.Value.Shuffle, edge.getSrc(), iFileMergerVertex);
edge.copyExecutionPropertiesTo(newEdgeToMerger);
final IREdge newEdgeFromMerger = new IREdge(CommunicationPatternProperty.Value.OneToOne,
- iFileMergerVertex, v);
+ iFileMergerVertex, v);
newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
builder.connectVertices(newEdgeToMerger);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index e3fa23e..f9a44ec 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -26,11 +26,9 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.reef.io.Tuple;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 5353763..d7f8c85 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -19,6 +19,7 @@
package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
import org.joda.time.Duration;
@@ -41,19 +43,65 @@ public final class WindowedWordCount {
private WindowedWordCount() {
}
+ public static final String INPUT_TYPE_BOUNDED = "bounded";
+ public static final String INPUT_TYPE_UNBOUNDED = "unbounded";
+ private static final String SPLITTER = "!";
+
+
+ private static PCollection<KV<String, Long>> getSource(
+ final Pipeline p,
+ final String[] args) {
+
+ final String inputType = args[2];
+ if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) {
+ final String inputFilePath = args[3];
+ return GenericSourceSink.read(p, inputFilePath)
+ .apply(ParDo.of(new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(@Element final String elem,
+ final OutputReceiver<String> out) {
+ final String[] splitt = elem.split(SPLITTER);
+ out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
+ }
+ }))
+ .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
+ @Override
+ public KV<String, Long> apply(final String line) {
+ final String[] words = line.split(" +");
+ final String documentId = words[0] + "#" + words[1];
+ final Long count = Long.parseLong(words[2]);
+ return KV.of(documentId, count);
+ }
+ }));
+ } else if (inputType.compareTo(INPUT_TYPE_UNBOUNDED) == 0) {
+ // unbounded
+ return p.apply(GenerateSequence
+ .from(1)
+ .withRate(2, Duration.standardSeconds(1))
+ .withTimestampFn(num -> new Instant(num * 500)))
+ .apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
+ @Override
+ public KV<String, Long> apply(final Long val) {
+ return KV.of(String.valueOf(val % 2), 1L);
+ }
+ }));
+ } else {
+ throw new RuntimeException("Unsupported input type: " + inputType);
+ }
+ }
/**
* Main function for the MR BEAM program.
* @param args arguments.
*/
public static void main(final String[] args) {
- final String inputFilePath = args[0];
- final String outputFilePath = args[1];
- final String windowType = args[2];
- final Window<String> windowFn;
+ final String outputFilePath = args[0];
+ final String windowType = args[1];
+
+ final Window<KV<String, Long>> windowFn;
if (windowType.equals("fixed")) {
- windowFn = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+ windowFn = Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(5)));
} else {
- windowFn = Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+ windowFn = Window.<KV<String, Long>>into(SlidingWindows.of(Duration.standardSeconds(10))
.every(Duration.standardSeconds(5)));
}
@@ -62,33 +110,18 @@ public final class WindowedWordCount {
options.setJobName("WindowedWordCount");
final Pipeline p = Pipeline.create(options);
- GenericSourceSink.read(p, inputFilePath)
- .apply(ParDo.of(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(@Element final String elem,
- final OutputReceiver<String> out) {
- final String[] splitt = elem.split("!");
- out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
- }
- }))
- .apply(windowFn)
- .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
- @Override
- public KV<String, Long> apply(final String line) {
- final String[] words = line.split(" +");
- final String documentId = words[0] + "#" + words[1];
- final Long count = Long.parseLong(words[2]);
- return KV.of(documentId, count);
- }
- }))
- .apply(Sum.longsPerKey())
- .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
- @Override
- public String apply(final KV<String, Long> kv) {
- return kv.getKey() + ": " + kv.getValue();
- }
- }))
- .apply(new WriteOneFilePerWindow(outputFilePath, null));
+
+ getSource(p, args)
+ .apply(windowFn)
+ .apply(Sum.longsPerKey())
+ .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
+ @Override
+ public String apply(final KV<String, Long> kv) {
+ return kv.getKey() + ": " + kv.getValue();
+ }
+ }))
+ .apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
p.run();
}
}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 55ed19d..c0134aa 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -28,6 +28,9 @@ import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_BOUNDED;
+import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_UNBOUNDED;
+
/**
* Test Windowed word count program with JobLauncher.
*/
@@ -51,7 +54,7 @@ public final class WindowedWordCountITCase {
public void testBatchFixedWindow() throws Exception {
builder = new ArgBuilder()
.addUserMain(WindowedWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath, "fixed");
+ .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
@@ -71,7 +74,7 @@ public final class WindowedWordCountITCase {
public void testBatchSlidingWindow() throws Exception {
builder = new ArgBuilder()
.addUserMain(WindowedWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath, "sliding");
+ .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, inputFilePath);
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
@@ -91,7 +94,7 @@ public final class WindowedWordCountITCase {
builder = new ArgBuilder()
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
.addUserMain(WindowedWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath, "fixed");
+ .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
@@ -112,7 +115,7 @@ public final class WindowedWordCountITCase {
builder = new ArgBuilder()
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
.addUserMain(WindowedWordCount.class.getCanonicalName())
- .addUserArgs(inputFilePath, outputFilePath, "sliding");
+ .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, inputFilePath);
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
@@ -126,4 +129,25 @@ public final class WindowedWordCountITCase {
ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
}
}
+
+
+ // TODO #271: We currently disable this test because we cannot force close Nemo
+ //@Test (timeout = TIMEOUT)
+ public void testUnboundedSlidingWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+ .addUserMain(WindowedWordCount.class.getCanonicalName())
+ .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_UNBOUNDED);
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+ } finally {
+ }
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index b218bf8..f7aa184 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -19,6 +19,10 @@
package org.apache.nemo.runtime.executor;
import com.google.protobuf.ByteString;
+import org.apache.nemo.common.coder.BytesDecoderFactory;
+import org.apache.nemo.common.coder.BytesEncoderFactory;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
@@ -39,6 +43,8 @@ import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
import org.apache.nemo.runtime.executor.data.SerializerManager;
import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventDecoderFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventEncoderFactory;
import org.apache.nemo.runtime.executor.task.TaskExecutor;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -114,6 +120,7 @@ public final class Executor {
* @param task to launch.
*/
private void launchTask(final Task task) {
+ LOG.info("Launch task: {}", task);
try {
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
SerializationUtils.deserialize(task.getSerializedIRDag());
@@ -121,19 +128,19 @@ public final class Executor {
new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
- e.getPropertyValue(EncoderProperty.class).get(),
- e.getPropertyValue(DecoderProperty.class).get(),
+ getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+ getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
- e.getPropertyValue(EncoderProperty.class).get(),
- e.getPropertyValue(DecoderProperty.class).get(),
+ getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+ getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
irDag.getVertices().forEach(v -> {
irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
- e.getPropertyValue(EncoderProperty.class).get(),
- e.getPropertyValue(DecoderProperty.class).get(),
+ getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+ getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
});
@@ -155,6 +162,36 @@ public final class Executor {
}
}
+ /**
+ * This wraps the encoder with NemoEventEncoder.
+ * If the encoder is BytesEncoderFactory, we do not wrap the encoder.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+ * @param encoderFactory encoder factory
+ * @return wrapped encoder
+ */
+ private EncoderFactory getEncoderFactory(final EncoderFactory encoderFactory) {
+ if (encoderFactory instanceof BytesEncoderFactory) {
+ return encoderFactory;
+ } else {
+ return new NemoEventEncoderFactory(encoderFactory);
+ }
+ }
+
+ /**
+ * This wraps the encoder with NemoEventDecoder.
+ * If the decoder is BytesDecoderFactory, we do not wrap the decoder.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+ * @param decoderFactory decoder factory
+ * @return wrapped decoder
+ */
+ private DecoderFactory getDecoderFactory(final DecoderFactory decoderFactory) {
+ if (decoderFactory instanceof BytesDecoderFactory) {
+ return decoderFactory;
+ } else {
+ return new NemoEventDecoderFactory(decoderFactory);
+ }
+ }
+
public void terminate() {
try {
metricMessageSender.close();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 51cd7e1..315760c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -22,6 +22,8 @@ import org.apache.nemo.runtime.executor.data.FileArea;
import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -37,6 +39,7 @@ import java.nio.file.Paths;
* although the execution order may not be linearized if they were called from different threads.</p>
*/
public final class ByteOutputContext extends ByteTransferContext implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(ByteOutputContext.class.getName());
private final Channel channel;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index 9cb7b23..80e83df 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -144,6 +144,8 @@ public final class DataUtil {
final List<NonSerializedPartition<K>> nonSerializedPartitions = new ArrayList<>();
for (final SerializedPartition<K> partitionToConvert : partitionsToConvert) {
final K key = partitionToConvert.getKey();
+
+
try (final ByteArrayInputStream byteArrayInputStream =
new ByteArrayInputStream(partitionToConvert.getData())) {
final NonSerializedPartition<K> deserializePartition = deserializePartition(
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index 952fbf4..71541c2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -116,6 +116,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
// inner buffer directly, which can be an unfinished(not flushed) buffer.
wrappedStream.close();
this.serializedData = bytesOutputStream.getBufDirectly();
+
this.length = bytesOutputStream.getCount();
this.committed = true;
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 2391533..4b85087 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -21,11 +21,14 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.block.Block;
import org.apache.nemo.runtime.executor.data.partitioner.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Optional;
@@ -34,6 +37,8 @@ import java.util.Optional;
* Represents the output data transfer from a task.
*/
public final class BlockOutputWriter implements OutputWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(BlockOutputWriter.class.getName());
+
private final RuntimeEdge<?> runtimeEdge;
private final IRVertex dstIrVertex;
private final Partitioner partitioner;
@@ -88,6 +93,11 @@ public final class BlockOutputWriter implements OutputWriter {
} // If else, does not need to write because the data is duplicated.
}
+ @Override
+ public void writeWatermark(final Watermark watermark) {
+ // do nothing
+ }
+
/**
* Notifies that all writes for a block is end.
* Further write about a committed block will throw an exception.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 56c7540..d50ad82 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -31,13 +31,19 @@ import org.slf4j.LoggerFactory;
public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
private final OperatorVertex nextOperatorVertex;
+ private final int edgeIndex;
+ private final InputWatermarkManager watermarkManager;
/**
* It forwards output to the next operator.
* @param nextOperatorVertex next operator to emit data and watermark
*/
- public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+ public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex,
+ final int edgeIndex,
+ final InputWatermarkManager watermarkManager) {
this.nextOperatorVertex = nextOperatorVertex;
+ this.edgeIndex = edgeIndex;
+ this.watermarkManager = watermarkManager;
}
@Override
@@ -47,7 +53,7 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
@Override
public void emitWatermark(final Watermark watermark) {
- nextOperatorVertex.getTransform().onWatermark(watermark);
+ watermarkManager.trackAndEmitWatermarks(edgeIndex, watermark);
}
@Override
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
index 66fb7aa..adbb659 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -31,6 +31,8 @@ public interface InputWatermarkManager {
/**
* This tracks the minimum input watermark among multiple input streams.
+ * This method is not a Thread-safe so the caller should synchronize it
+ * if multiple threads access this method concurrently.
* Ex)
* -- input stream1 (edge 1): ---------- ts: 3 ------------------ts: 6
* ^^^
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 91c7c55..613eccc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -18,8 +18,10 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
@@ -28,14 +30,16 @@ import java.util.List;
* This tracks the minimum input watermark among multiple input streams.
*/
public final class MultiInputWatermarkManager implements InputWatermarkManager {
+ private static final Logger LOG = LoggerFactory.getLogger(MultiInputWatermarkManager.class.getName());
+
private final List<Watermark> watermarks;
- private final OperatorVertex nextOperator;
+ private final OutputCollector<?> watermarkCollector;
private int minWatermarkIndex;
public MultiInputWatermarkManager(final int numEdges,
- final OperatorVertex nextOperator) {
+ final OutputCollector<?> watermarkCollector) {
super();
this.watermarks = new ArrayList<>(numEdges);
- this.nextOperator = nextOperator;
+ this.watermarkCollector = watermarkCollector;
this.minWatermarkIndex = 0;
// We initialize watermarks as min value because
// we should not emit watermark until all edges emit watermarks.
@@ -58,6 +62,12 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
@Override
public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Track watermark {} emitted from edge {}:, {}", watermark.getTimestamp(), edgeIndex,
+ watermarks.toString());
+ }
+
if (edgeIndex == minWatermarkIndex) {
// update min watermark
final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
@@ -74,7 +84,10 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
// Watermark timestamp progress!
// Emit the min watermark
- nextOperator.getTransform().onWatermark(minWatermark);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Emit watermark {}, {}", minWatermark, watermarks);
+ }
+ watermarkCollector.emitWatermark(minWatermark);
}
} else {
// The recent watermark timestamp cannot be less than the previous one
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
new file mode 100644
index 0000000..1367e70
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A factory for NemoEventDecoder.
+ */
+public final class NemoEventDecoderFactory implements DecoderFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(NemoEventDecoderFactory.class.getName());
+
+ private final DecoderFactory valueDecoderFactory;
+
+ public NemoEventDecoderFactory(final DecoderFactory valueDecoderFactory) {
+ this.valueDecoderFactory = valueDecoderFactory;
+ }
+
+ @Override
+ public Decoder create(final InputStream inputStream) throws IOException {
+ return new NemoEventDecoder(valueDecoderFactory.create(inputStream), inputStream);
+ }
+
+ /**
+ * This class decodes receive data into two types.
+ * - normal data
+ * - WatermarkWithIndex
+ */
+ private final class NemoEventDecoder implements DecoderFactory.Decoder {
+
+ private final Decoder valueDecoder;
+ private final InputStream inputStream;
+
+ NemoEventDecoder(final Decoder valueDecoder,
+ final InputStream inputStream) {
+ this.valueDecoder = valueDecoder;
+ this.inputStream = inputStream;
+ }
+
+ @Override
+ public Object decode() throws IOException {
+
+ final byte isWatermark = (byte) inputStream.read();
+ if (isWatermark == -1) {
+ // end of the input stream
+ throw new EOFException();
+ }
+
+ if (isWatermark == 0x00) {
+ // this is not a watermark
+ return valueDecoder.decode();
+ } else if (isWatermark == 0x01) {
+ // this is a watermark
+ final WatermarkWithIndex watermarkWithIndex =
+ (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
+ return watermarkWithIndex;
+ } else {
+ throw new RuntimeException("Watermark decoding failure: " + isWatermark);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder stringBuilder = new StringBuilder("NemoDecoder{");
+ stringBuilder.append(valueDecoder.toString());
+ stringBuilder.append("}");
+ return stringBuilder.toString();
+ }
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
new file mode 100644
index 0000000..c49beda
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A factory for NemoEventEncoder.
+ */
+public final class NemoEventEncoderFactory implements EncoderFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(NemoEventEncoderFactory.class.getName());
+
+ private final EncoderFactory valueEncoderFactory;
+
+ public NemoEventEncoderFactory(final EncoderFactory valueEncoderFactory) {
+ this.valueEncoderFactory = valueEncoderFactory;
+ }
+
+ @Override
+ public Encoder create(final OutputStream outputStream) throws IOException {
+ return new NemoEventEncoder(valueEncoderFactory.create(outputStream), outputStream);
+ }
+
+ /**
+ * This encodes normal data and WatermarkWithIndex.
+ * @param <T>
+ */
+ private final class NemoEventEncoder<T> implements EncoderFactory.Encoder<T> {
+ private final EncoderFactory.Encoder<T> valueEncoder;
+ private final OutputStream outputStream;
+
+ NemoEventEncoder(final EncoderFactory.Encoder<T> valueEncoder,
+ final OutputStream outputStream) {
+ this.valueEncoder = valueEncoder;
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public void encode(final T element) throws IOException {
+ if (element instanceof WatermarkWithIndex) {
+ outputStream.write(0x01); // this is watermark
+ outputStream.write(SerializationUtils.serialize((Serializable) element));
+ } else {
+ outputStream.write(0x00); // this is a data element
+ valueEncoder.encode(element);
+ }
+ }
+ }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 3637780..12d9932 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,6 +77,7 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emit(final O output) {
+
for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
emit(internalVertex.getNextOperator(), output);
}
@@ -104,6 +105,11 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emitWatermark(final Watermark watermark) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
+ }
+
// Emit watermarks to internal vertices
for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
@@ -115,7 +121,15 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
}
}
- // TODO #245: handle watermarks in OutputWriter
- // TODO #245: currently ignore emitting watermarks to output writer
+ // Emit watermarks to output writer
+ for (final OutputWriter outputWriter : externalMainOutputs) {
+ outputWriter.writeWatermark(watermark);
+ }
+
+ for (final List<OutputWriter> externalVertices : externalAdditionalOutputs.values()) {
+ for (final OutputWriter externalVertex : externalVertices) {
+ externalVertex.writeWatermark(watermark);
+ }
+ }
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
similarity index 56%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
index 56c7540..66efb72 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
@@ -23,35 +23,31 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * This collector receives data from DataFetcher and forwards it to the next operator.
- * @param <O> output type.
+ * This class is used for collecting watermarks for an OperatorVertex.
+ * InputWatermarkManager emits watermarks to this class.
*/
-public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
- private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
- private final OperatorVertex nextOperatorVertex;
+public final class OperatorWatermarkCollector implements OutputCollector {
+ private static final Logger LOG = LoggerFactory.getLogger(OperatorWatermarkCollector.class.getName());
+
+ private final OperatorVertex operatorVertex;
- /**
- * It forwards output to the next operator.
- * @param nextOperatorVertex next operator to emit data and watermark
- */
- public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
- this.nextOperatorVertex = nextOperatorVertex;
+ public OperatorWatermarkCollector(final OperatorVertex operatorVertex) {
+ this.operatorVertex = operatorVertex;
}
@Override
- public void emit(final O output) {
- nextOperatorVertex.getTransform().onData(output);
+ public void emit(final Object output) {
+ throw new IllegalStateException("Should not be called");
}
@Override
public void emitWatermark(final Watermark watermark) {
- nextOperatorVertex.getTransform().onWatermark(watermark);
+ operatorVertex.getTransform().onWatermark(watermark);
}
@Override
- public <T> void emit(final String dstVertexId, final T output) {
- throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
+ public void emit(final String dstVertexId, final Object output) {
+ throw new IllegalStateException("Should not be called");
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 032510a..301c95a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.exception.UnsupportedPartitionerException;
import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.data.partitioner.*;
@@ -41,6 +42,12 @@ public interface OutputWriter {
void write(final Object element);
/**
+ * Writes watermarks to all edges.
+ * @param watermark watermark
+ */
+ void writeWatermark(final Watermark watermark);
+
+ /**
* @return the total written bytes.
*/
Optional<Long> getWrittenBytes();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index a5dbf93..dd70394 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.DirectByteArrayOutputStream;
import org.apache.nemo.common.coder.EncoderFactory;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -43,6 +45,7 @@ public final class PipeOutputWriter implements OutputWriter {
private static final Logger LOG = LoggerFactory.getLogger(OutputWriter.class.getName());
private final String srcTaskId;
+ private final int srcTaskIndex;
private final PipeManagerWorker pipeManagerWorker;
private final Partitioner partitioner;
@@ -70,6 +73,27 @@ public final class PipeOutputWriter implements OutputWriter {
this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
this.runtimeEdge = runtimeEdge;
+ this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(srcTaskId);
+ }
+
+ private void writeData(final Object element, final List<ByteOutputContext> pipeList) {
+ pipeList.forEach(pipe -> {
+
+ try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream()) {
+ // Serialize (Do not compress)
+ final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
+ final OutputStream wrapped =
+ DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+ final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
+ encoder.encode(element);
+ wrapped.close();
+
+ // Write
+ pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+ } catch (IOException e) {
+ throw new RuntimeException(e); // For now we crash the executor on IOException
+ }
+ });
}
/**
@@ -82,19 +106,17 @@ public final class PipeOutputWriter implements OutputWriter {
doInitialize();
}
- try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = getPipeToWrite(element)) {
- // Serialize (Do not compress)
- final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
- final OutputStream wrapped = DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
- final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
- encoder.encode(element);
- wrapped.close();
-
- // Write
- pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
- } catch (IOException e) {
- throw new RuntimeException(e); // For now we crash the executor on IOException
+ writeData(element, getPipeToWrite(element));
+ }
+
+ @Override
+ public void writeWatermark(final Watermark watermark) {
+ if (!initialized) {
+ doInitialize();
}
+
+ final WatermarkWithIndex watermarkWithIndex = new WatermarkWithIndex(watermark, srcTaskIndex);
+ writeData(watermarkWithIndex, pipes);
}
@Override
@@ -126,11 +148,11 @@ public final class PipeOutputWriter implements OutputWriter {
this.serializer = pipeManagerWorker.getSerializer(runtimeEdge.getId());
}
- private ByteOutputContext.ByteOutputStream getPipeToWrite(final Object element) throws IOException {
+ private List<ByteOutputContext> getPipeToWrite(final Object element) {
return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
.get()
.equals(CommunicationPatternProperty.Value.OneToOne)
- ? pipes.get(0).newOutputStream()
- : pipes.get((int) partitioner.partition(element)).newOutputStream();
+ ? Collections.singletonList(pipes.get(0))
+ : Collections.singletonList(pipes.get((int) partitioner.partition(element)));
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index 204bf22..e8135f9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -18,7 +18,7 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
@@ -27,10 +27,10 @@ import org.apache.nemo.common.punctuation.Watermark;
*/
public final class SingleInputWatermarkManager implements InputWatermarkManager {
- private final OperatorVertex nextOperator;
+ private final OutputCollector watermarkCollector;
- public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
- this.nextOperator = nextOperator;
+ public SingleInputWatermarkManager(final OutputCollector watermarkCollector) {
+ this.watermarkCollector = watermarkCollector;
}
/**
@@ -41,6 +41,6 @@ public final class SingleInputWatermarkManager implements InputWatermarkManager
@Override
public void trackAndEmitWatermarks(final int edgeIndex,
final Watermark watermark) {
- nextOperator.getTransform().onWatermark(watermark);
+ watermarkCollector.emitWatermark(watermark);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
similarity index 56%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
index 204bf22..3db6cd5 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
@@ -18,29 +18,37 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.punctuation.Watermark;
+import java.io.Serializable;
/**
- * This is a special implementation for single input data stream for optimization.
+ * This contains a watermark and the src task index.
+ * It is used for transferring the watermark between tasks.
*/
-public final class SingleInputWatermarkManager implements InputWatermarkManager {
+public final class WatermarkWithIndex implements Serializable {
+ private final Watermark watermark;
+ private final int index;
- private final OperatorVertex nextOperator;
+ public WatermarkWithIndex(final Watermark watermark, final int index) {
+ this.watermark = watermark;
+ this.index = index;
+ }
+
+ public Watermark getWatermark() {
+ return watermark;
+ }
- public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
- this.nextOperator = nextOperator;
+ public int getIndex() {
+ return index;
}
- /**
- * This just forwards watermarks to the next operator because it has one data stream.
- * @param edgeIndex edge index
- * @param watermark watermark
- */
@Override
- public void trackAndEmitWatermarks(final int edgeIndex,
- final Watermark watermark) {
- nextOperator.getTransform().onWatermark(watermark);
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(watermark);
+ sb.append(" from ");
+ sb.append(index);
+ return sb.toString();
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index a9b0da3..001060c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -21,8 +21,9 @@ package org.apache.nemo.runtime.executor.task;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.runtime.executor.data.DataUtil;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,9 +57,12 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
private long serBytes = 0;
private long encodedBytes = 0;
- private int numOfIterators;
+ private int numOfIterators; // == numOfIncomingEdges
private int numOfFinishMarks = 0;
+ // A watermark manager
+ private InputWatermarkManager inputWatermarkManager;
+
MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
final InputReader readerForParentTask,
final OutputCollector outputCollector) {
@@ -96,6 +100,12 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
numOfIterators = futures.size();
+ if (numOfIterators > 1) {
+ inputWatermarkManager = new MultiInputWatermarkManager(numOfIterators, new WatermarkCollector());
+ } else {
+ inputWatermarkManager = new SingleInputWatermarkManager(new WatermarkCollector());
+ }
+
futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
// A thread for each iterator
queueInsertionThreads.submit(() -> {
@@ -103,7 +113,21 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
// Consume this iterator to the end.
while (iterator.hasNext()) { // blocked on the iterator.
final Object element = iterator.next();
- elementQueue.offer(element);
+
+
+ if (element instanceof WatermarkWithIndex) {
+ // watermark element
+ // the input watermark manager is accessed by multiple threads
+ // so we should synchronize it
+ synchronized (inputWatermarkManager) {
+ final WatermarkWithIndex watermarkWithIndex = (WatermarkWithIndex) element;
+ inputWatermarkManager.trackAndEmitWatermarks(
+ watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
+ }
+ } else {
+ // data element
+ elementQueue.offer(element);
+ }
}
// This iterator is finished.
@@ -147,4 +171,26 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
public void close() throws Exception {
queueInsertionThreads.shutdown();
}
+
+ /**
+ * Just adds the emitted watermark to the element queue.
+ * It receives the watermark from InputWatermarkManager.
+ */
+ private final class WatermarkCollector implements OutputCollector {
+
+ @Override
+ public void emit(final Object output) {
+ throw new IllegalStateException("Should not be called");
+ }
+
+ @Override
+ public void emitWatermark(final Watermark watermark) {
+ elementQueue.offer(watermark);
+ }
+
+ @Override
+ public void emit(final String dstVertexId, final Object output) {
+ throw new IllegalStateException("Should not be called");
+ }
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index fa4bd8a..b42bd77 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -23,6 +23,8 @@ import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.NoSuchElementException;
@@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit;
* Fetches data from a data source.
*/
class SourceVertexDataFetcher extends DataFetcher {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceVertexDataFetcher.class.getName());
+
private final Readable readable;
private long boundedSourceReadTime = 0;
private static final long WATERMARK_PERIOD = 1000; // ms
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 8c92443..518bff3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.task;
import com.google.common.collect.Lists;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
@@ -124,6 +125,21 @@ public final class TaskExecutor {
this.sortedHarnesses = pair.right();
}
+ // Get all of the intra-task edges + inter-task edges
+ private List<Edge> getAllIncomingEdges(
+ final Task task,
+ final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+ final IRVertex childVertex) {
+ final List<Edge> edges = new ArrayList<>();
+ edges.addAll(irVertexDag.getIncomingEdgesOf(childVertex));
+ final List<StageEdge> taskEdges = task.getTaskIncomingEdges().stream()
+ .filter(edge -> edge.getDstIRVertex().getId().equals(childVertex.getId()))
+ .collect(Collectors.toList());
+ edges.addAll(taskEdges);
+ return edges;
+ }
+
+
/**
* Converts the DAG of vertices into pointer-based DAG of vertex harnesses.
* This conversion is necessary for constructing concrete data channels for each vertex's inputs and outputs.
@@ -158,11 +174,11 @@ public final class TaskExecutor {
// Build a map for edge as a key and edge index as a value
// This variable is used for creating NextIntraTaskOperatorInfo
// in {@link this#getInternalMainOutputs and this#internalMainOutputs}
- final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+ final Map<Edge, Integer> edgeIndexMap = new HashMap<>();
reverseTopologicallySorted.forEach(childVertex -> {
- final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+ final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
- final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+ final Edge edge = edges.get(edgeIndex);
edgeIndexMap.putIfAbsent(edge, edgeIndex);
}
});
@@ -174,13 +190,15 @@ public final class TaskExecutor {
reverseTopologicallySorted.forEach(childVertex -> {
if (childVertex instanceof OperatorVertex) {
- final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+ final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
if (edges.size() == 1) {
operatorWatermarkManagerMap.putIfAbsent(childVertex,
- new SingleInputWatermarkManager((OperatorVertex) childVertex));
+ new SingleInputWatermarkManager(
+ new OperatorWatermarkCollector((OperatorVertex) childVertex)));
} else {
operatorWatermarkManagerMap.putIfAbsent(childVertex,
- new MultiInputWatermarkManager(edges.size(), (OperatorVertex) childVertex));
+ new MultiInputWatermarkManager(edges.size(),
+ new OperatorWatermarkCollector((OperatorVertex) childVertex)));
}
}
@@ -257,23 +275,33 @@ public final class TaskExecutor {
.orElseThrow(() -> new IllegalStateException(inEdge.toString())),
broadcastReaders.get(i));
}
+
// Parent-task read (non-broadcasts)
final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
nonBroadcastInEdges.removeAll(broadcastInEdges);
- final List<InputReader> nonBroadcastReaders =
- getParentTaskReaders(taskIndex, nonBroadcastInEdges, intermediateDataIOFactory);
- nonBroadcastReaders.forEach(parentTaskReader -> {
- final DataFetcher dataFetcher;
- if (parentTaskReader instanceof PipeInputReader) {
- nonBroadcastDataFetcherList.add(
- new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
- new DataFetcherOutputCollector((OperatorVertex) irVertex)));
- } else {
- nonBroadcastDataFetcherList.add(
- new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
- new DataFetcherOutputCollector((OperatorVertex) irVertex)));
- }
- });
+
+ nonBroadcastInEdges
+ .stream()
+ .map(incomingEdge ->
+ Pair.of(incomingEdge, intermediateDataIOFactory
+ .createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge)))
+ .forEach(pair -> {
+ if (irVertex instanceof OperatorVertex) {
+ final StageEdge edge = pair.left();
+ final int edgeIndex = edgeIndexMap.get(edge);
+ final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex);
+ final InputReader parentTaskReader = pair.right();
+ if (parentTaskReader instanceof PipeInputReader) {
+ nonBroadcastDataFetcherList.add(
+ new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+ new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+ } else {
+ nonBroadcastDataFetcherList.add(
+ new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+ new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+ }
+ }
+ });
});
final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -529,7 +557,7 @@ public final class TaskExecutor {
private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
final IRVertex irVertex,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
- final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+ final Map<Edge, Integer> edgeIndexMap,
final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
// Add all intra-task additional tags to additional output map.
final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
@@ -556,7 +584,7 @@ public final class TaskExecutor {
private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
final IRVertex irVertex,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
- final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+ final Map<Edge, Integer> edgeIndexMap,
final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
index 9303da8..5242d46 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -47,7 +47,7 @@ public final class InputWatermarkManagerTest {
final OperatorVertex operatorVertex = new OperatorVertex(transform);
final InputWatermarkManager watermarkManager =
- new MultiInputWatermarkManager(3, operatorVertex);
+ new MultiInputWatermarkManager(3, new OperatorWatermarkCollector(operatorVertex));
//edge1: 10 s
//edge2: 5 s
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index 63d98d5..bf6279c 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -29,6 +29,8 @@ import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.driver.context.ActiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.*;
@@ -49,6 +51,8 @@ import java.util.stream.Stream;
*/
@NotThreadSafe
public final class ExecutorRepresenter {
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
+
private final String executorId;
private final ResourceSpecification resourceSpecification;
private final Map<String, Task> runningComplyingTasks;
@@ -113,18 +117,20 @@ public final class ExecutorRepresenter {
? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
runningTaskToAttempt.put(task, task.getAttemptIdx());
failedTasks.remove(task);
- serializationExecutorService.submit(() -> {
+
+
+ serializationExecutorService.execute(() -> {
final byte[] serialized = SerializationUtils.serialize(task);
sendControlMessage(
- ControlMessage.Message.newBuilder()
- .setId(RuntimeIdManager.generateMessageId())
- .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
- .setType(ControlMessage.MessageType.ScheduleTask)
- .setScheduleTaskMsg(
- ControlMessage.ScheduleTaskMsg.newBuilder()
- .setTask(ByteString.copyFrom(serialized))
- .build())
- .build());
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.ScheduleTask)
+ .setScheduleTaskMsg(
+ ControlMessage.ScheduleTaskMsg.newBuilder()
+ .setTask(ByteString.copyFrom(serialized))
+ .build())
+ .build());
});
}