You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2018/11/21 09:03:49 UTC
[incubator-nemo] branch master updated: [NEMO-216, 251,
259] Support side inputs and windowing (#159)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9452d [NEMO-216,251,259] Support side inputs and windowing (#159)
7d9452d is described below
commit 7d9452d31d41fdaaeed8da4d84bcb06adb32bcca
Author: John Yang <jo...@gmail.com>
AuthorDate: Wed Nov 21 18:03:45 2018 +0900
[NEMO-216,251,259] Support side inputs and windowing (#159)
JIRA: [NEMO-216: Support side inputs and windowing](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-216)
(+ NEMO-251, NEMO-259)
**Major changes:**
- MaterializedViewReader (implements ReadyCheckingSideInputReader) and appropriate push-back logics
- SideInputTransform: Wraps materialized views with an index in SideInputElement to distinguish multiple sideinputs destined to the same DoFn (replaces the previous usage of BroadcastVariable ids)
- In the runtime, treat a SideInput as just another DataFetcher (do not use BroadcastManagerWorker): all sideinput-specific logics stay in the Beam frontend
**Minor changes to note:**
- Clean up watermark propagation flow for DataFetchers
- In MultiThreadParentTaskDataFetcher, let the task thread handle watermarks just like other elements and not data fetcher threads
**Tests for the changes:**
- WindowedBroadcastITCase
- DoFnTransformTest#testSideInputs
**Other comments:**
- This is the last sub-issue of the umbrella issue: [NEMO-229] Support watermark, so once this PR is merged we should be ready to mark that as resolved
---
.../org/apache/nemo/common/dag/DAGBuilder.java | 9 --
.../apache/nemo/common/punctuation/Watermark.java | 2 +-
.../frontend/beam/InMemorySideInputReader.java | 108 ++++++++++++++
.../frontend/beam/PipelineTranslationContext.java | 98 +++++++------
.../compiler/frontend/beam/PipelineTranslator.java | 69 ++++++---
.../compiler/frontend/beam/SideInputElement.java | 36 +++--
.../{FloatArrayCoder.java => SideInputCoder.java} | 35 ++---
.../beam/source/BeamBoundedSourceVertex.java | 12 +-
.../beam/source/BeamUnboundedSourceVertex.java | 12 +-
.../beam/transform/AbstractDoFnTransform.java | 99 +++++++++----
.../BroadcastVariableSideInputReader.java | 63 --------
.../beam/transform/CreateViewTransform.java | 28 ++--
.../frontend/beam/transform/DoFnTransform.java | 26 +---
.../GroupByKeyAndWindowDoFnTransform.java | 41 ++----
.../beam/transform/GroupByKeyTransform.java | 8 --
.../beam/transform/PushBackDoFnTransform.java | 160 +++++++++++++++++++++
.../beam/transform/SideInputTransform.java | 72 ++++++++++
.../frontend/beam/transform/WindowFnTransform.java | 7 +-
.../frontend/beam/BeamFrontendALSTest.java | 2 +-
.../frontend/beam/BeamFrontendMLRTest.java | 4 +-
.../beam/transform/CreateViewTransformTest.java | 2 +
.../frontend/beam/transform/DoFnTransformTest.java | 121 ++++++++++------
.../GroupByKeyAndWindowDoFnTransformTest.java | 19 ++-
.../reshaping/LoopInvariantCodeMotionPassTest.java | 2 +-
.../nemo/examples/beam/AlternatingLeastSquare.java | 2 -
.../beam/AlternatingLeastSquareInefficient.java | 2 -
.../nemo/examples/beam}/FloatArrayCoder.java | 2 +-
.../apache/nemo/examples/beam}/IntArrayCoder.java | 2 +-
.../nemo/examples/beam/WindowedBroadcast.java | 90 ++++++++++++
.../nemo/examples/beam/WindowedWordCount.java | 2 +-
.../examples/beam/WindowedBroadcastITCase.java | 67 +++++++++
.../examples/beam/WindowedWordCountITCase.java | 11 +-
.../org/apache/nemo/runtime/executor/Executor.java | 2 +-
.../executor/data/BroadcastManagerWorker.java | 68 +++------
.../DedicatedKeyPerElementPartitioner.java | 1 +
.../datatransfer/MultiInputWatermarkManager.java | 1 -
.../OperatorVertexOutputCollector.java | 1 -
.../executor/datatransfer/PipeOutputWriter.java | 15 +-
.../datatransfer/SingleInputWatermarkManager.java | 3 +
.../nemo/runtime/executor/task/DataFetcher.java | 8 ++
.../task/MultiThreadParentTaskDataFetcher.java | 6 +-
.../executor/task/ParentTaskDataFetcher.java | 3 +-
.../nemo/runtime/executor/task/TaskExecutor.java | 100 ++++++-------
.../executor/task/ParentTaskDataFetcherTest.java | 7 +-
.../runtime/executor/task/TaskExecutorTest.java | 25 +---
45 files changed, 948 insertions(+), 505 deletions(-)
diff --git a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
index a6269a0..6a6ca4d 100644
--- a/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/dag/DAGBuilder.java
@@ -20,7 +20,6 @@ package org.apache.nemo.common.dag;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import org.apache.nemo.common.ir.vertex.*;
@@ -258,14 +257,6 @@ public final class DAGBuilder<V extends Vertex, E extends Edge<V>> implements Se
* Helper method to check that all execution properties are correct and makes sense.
*/
private void executionPropertyCheck() {
- // SideInput is not compatible with Push
- vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
- .filter(e -> e.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
- .filter(e -> DataFlowProperty.Value.Push.equals(e.getPropertyValue(DataFlowProperty.class).get()))
- .forEach(e -> {
- throw new CompileTimeOptimizationException("DAG execution property check: "
- + "Broadcast edge is not compatible with push" + e.getId());
- }));
// DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection)
vertices.forEach(v -> incomingEdges.get(v).stream().filter(e -> e instanceof IREdge).map(e -> (IREdge) e)
.filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
index e676e6e..1055f0b 100644
--- a/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Watermark.java
@@ -49,7 +49,7 @@ public final class Watermark implements Serializable, Comparable<Watermark> {
@Override
public String toString() {
- return String.valueOf(timestamp);
+ return String.valueOf("Watermark(" + timestamp + ")");
}
@Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
new file mode 100644
index 0000000..9a18b0c
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam;
+
+import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.compiler.frontend.beam.transform.CreateViewTransform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+/**
+ * Accumulates and provides side inputs in memory.
+ */
+public final class InMemorySideInputReader implements ReadyCheckingSideInputReader {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemorySideInputReader.class.getName());
+
+ private long curWatermark = Long.MIN_VALUE;
+
+ private final Collection<PCollectionView<?>> sideInputsToRead;
+ private final Map<Pair<PCollectionView<?>, BoundedWindow>, Object> inMemorySideInputs;
+
+ public InMemorySideInputReader(final Collection<PCollectionView<?>> sideInputsToRead) {
+ this.sideInputsToRead = sideInputsToRead;
+ this.inMemorySideInputs = new HashMap<>();
+ }
+
+ @Override
+ public boolean isReady(final PCollectionView view, final BoundedWindow window) {
+ return window.maxTimestamp().getMillis() < curWatermark
+ || inMemorySideInputs.containsKey(Pair.of(view, window));
+ }
+
+ @Nullable
+ @Override
+ public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+ // This gets called after isReady()
+ final T sideInputData = (T) inMemorySideInputs.get(Pair.of(view, window));
+ return sideInputData == null
+ // The upstream gave us an empty sideInput
+ ? ((ViewFn<Object, T>) view.getViewFn()).apply(new CreateViewTransform.MultiView<T>(Collections.emptyList()))
+ // The upstream gave us a concrete sideInput
+ : sideInputData;
+ }
+
+ @Override
+ public <T> boolean contains(final PCollectionView<T> view) {
+ return sideInputsToRead.contains(view);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return sideInputsToRead.isEmpty();
+ }
+
+ /**
+ * Stores the side input in memory to be used with main inputs.
+ * @param view of the side input.
+ * @param sideInputElement to add.
+ */
+ public void addSideInputElement(final PCollectionView<?> view,
+ final WindowedValue<SideInputElement<?>> sideInputElement) {
+ for (final BoundedWindow bw : sideInputElement.getWindows()) {
+ inMemorySideInputs.put(Pair.of(view, bw), sideInputElement.getValue().getSideInputValue());
+ }
+ }
+
+ /**
+ * Say a DoFn of this reader has 3 main inputs and 4 side inputs.
+ * {@link org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager} guarantees that the watermark here
+ * is the minimum of the all 7 input streams.
+ * @param newWatermark to set.
+ */
+ public void setCurrentWatermarkOfAllMainAndSideInputs(final long newWatermark) {
+ if (curWatermark > newWatermark) {
+ // Cannot go backwards in time.
+ throw new IllegalStateException(curWatermark + " > " + newWatermark);
+ }
+
+ this.curWatermark = newWatermark;
+ // TODO #282: Handle late data
+ inMemorySideInputs.entrySet().removeIf(entry -> {
+ return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs.
+ });
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 722f421..d54a7cd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -35,6 +35,7 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.coder.BeamDecoderFactory;
import org.apache.nemo.compiler.frontend.beam.coder.BeamEncoderFactory;
+import org.apache.nemo.compiler.frontend.beam.coder.SideInputCoder;
import org.apache.nemo.compiler.frontend.beam.transform.*;
import java.util.*;
@@ -92,68 +93,83 @@ final class PipelineTranslationContext {
}
/**
+ * Say the dstIRVertex consumes three views: view0, view1, and view2.
+ *
+ * We translate that as the following:
+ * view0 -> SideInputTransform(index=0) ->
+ * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from indices to PCollectionViews)
+ * view2 -> SideInputTransform(index=2) ->
+ *
+ * @param dstVertex vertex.
+ * @param sideInputs of the vertex.
+ */
+ void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, PCollectionView<?>> sideInputs) {
+ for (final Map.Entry<Integer, PCollectionView<?>> entry : sideInputs.entrySet()) {
+ final int index = entry.getKey();
+ final PCollectionView view = entry.getValue();
+
+ final IRVertex srcVertex = pValueToProducerVertex.get(view);
+ final IRVertex sideInputTransformVertex = new OperatorVertex(new SideInputTransform(index));
+ addVertex(sideInputTransformVertex);
+ final Coder viewCoder = getCoderForView(view, this);
+ final Coder windowCoder = view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
+ // First edge: view to transform
+ final IREdge firstEdge =
+ new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, sideInputTransformVertex);
+ addEdge(firstEdge, viewCoder, windowCoder);
+
+ // Second edge: transform to the dstIRVertex
+ final IREdge secondEdge =
+ new IREdge(CommunicationPatternProperty.Value.OneToOne, sideInputTransformVertex, dstVertex);
+ final WindowedValue.FullWindowedValueCoder sideInputElementCoder =
+ WindowedValue.getFullCoder(SideInputCoder.of(viewCoder), windowCoder);
+
+ secondEdge.setProperty(EncoderProperty.of(new BeamEncoderFactory(sideInputElementCoder)));
+ secondEdge.setProperty(DecoderProperty.of(new BeamDecoderFactory(sideInputElementCoder)));
+ builder.connectVertices(secondEdge);
+ }
+ }
+
+ /**
* Add IR edge to the builder.
*
* @param dst the destination IR vertex.
* @param input the {@link PValue} {@code dst} consumes
*/
void addEdgeTo(final IRVertex dst, final PValue input) {
- final Coder coder;
if (input instanceof PCollection) {
- coder = ((PCollection) input).getCoder();
- } else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, this);
- } else {
- throw new RuntimeException(String.format("While adding an edge to %s, coder for PValue %s cannot "
- + "be determined", dst, input));
- }
- addEdgeTo(dst, input, coder);
- }
+ final Coder elementCoder = ((PCollection) input).getCoder();
+ final Coder windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
+ final IRVertex src = pValueToProducerVertex.get(input);
+ if (src == null) {
+ throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input));
+ }
- void addEdgeTo(final IRVertex dst, final PValue input, final Coder elementCoder) {
- final IRVertex src = pValueToProducerVertex.get(input);
- if (src == null) {
- throw new IllegalStateException(String.format("Cannot find a vertex that emits pValue %s", input));
- }
+ final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst);
+ final IREdge edge = new IREdge(communicationPattern, src, dst);
- final Coder windowCoder;
- final CommunicationPatternProperty.Value communicationPattern = getCommPattern(src, dst);
- final IREdge edge = new IREdge(communicationPattern, src, dst);
+ if (pValueToTag.containsKey(input)) {
+ edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
+ }
- if (pValueToTag.containsKey(input)) {
- edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
- }
- if (input instanceof PCollectionView) {
- edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input));
- }
- if (input instanceof PCollection) {
- windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
- } else if (input instanceof PCollectionView) {
- windowCoder = ((PCollectionView) input).getPCollection()
- .getWindowingStrategy().getWindowFn().windowCoder();
+ addEdge(edge, elementCoder, windowCoder);
} else {
- throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
- + "be determined", src, dst, input));
+ throw new IllegalStateException(input.toString());
}
-
- addEdgeTo(edge, elementCoder, windowCoder);
}
- void addEdgeTo(final IREdge edge,
- final Coder elementCoder,
- final Coder windowCoder) {
+ void addEdge(final IREdge edge, final Coder elementCoder, final Coder windowCoder) {
edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
if (elementCoder instanceof KvCoder) {
Coder keyCoder = ((KvCoder) elementCoder).getKeyCoder();
edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
}
- edge.setProperty(EncoderProperty.of(
- new BeamEncoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
- edge.setProperty(DecoderProperty.of(
- new BeamDecoderFactory<>(WindowedValue.getFullCoder(elementCoder, windowCoder))));
+ final WindowedValue.FullWindowedValueCoder coder = WindowedValue.getFullCoder(elementCoder, windowCoder);
+ edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
+ edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
builder.connectVertices(edge);
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 7a22ba8..a6ae6ce 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
@@ -48,7 +49,9 @@ import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* A collection of translators for the Beam PTransforms.
@@ -164,7 +167,7 @@ final class PipelineTranslator {
private static void unboundedReadTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final Read.Unbounded<?> transform) {
- final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource());
+ final IRVertex vertex = new BeamUnboundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
ctx.addVertex(vertex);
beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -174,7 +177,7 @@ final class PipelineTranslator {
private static void boundedReadTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final Read.Bounded<?> transform) {
- final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource());
+ final IRVertex vertex = new BeamBoundedSourceVertex<>(transform.getSource(), DisplayData.from(transform));
ctx.addVertex(vertex);
beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -184,14 +187,15 @@ final class PipelineTranslator {
private static void parDoSingleOutputTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final ParDo.SingleOutput<?, ?> transform) {
- final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+ final Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(transform.getSideInputs());
+ final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap);
final IRVertex vertex = new OperatorVertex(doFnTransform);
ctx.addVertex(vertex);
beamNode.getInputs().values().stream()
.filter(input -> !transform.getAdditionalInputs().values().contains(input))
.forEach(input -> ctx.addEdgeTo(vertex, input));
- transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+ ctx.addSideInputEdges(vertex, sideInputMap);
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
}
@@ -199,13 +203,14 @@ final class PipelineTranslator {
private static void parDoMultiOutputTranslator(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final ParDo.MultiOutput<?, ?> transform) {
- final DoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode);
+ final Map<Integer, PCollectionView<?>> sideInputMap = getSideInputMap(transform.getSideInputs());
+ final AbstractDoFnTransform doFnTransform = createDoFnTransform(ctx, beamNode, sideInputMap);
final IRVertex vertex = new OperatorVertex(doFnTransform);
ctx.addVertex(vertex);
beamNode.getInputs().values().stream()
.filter(input -> !transform.getAdditionalInputs().values().contains(input))
.forEach(input -> ctx.addEdgeTo(vertex, input));
- transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
+ ctx.addSideInputEdges(vertex, sideInputMap);
beamNode.getOutputs().entrySet().stream()
.filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
.forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(beamNode, vertex, pValueWithTupleTag.getValue()));
@@ -237,7 +242,8 @@ final class PipelineTranslator {
} else {
throw new UnsupportedOperationException(String.format("%s is not supported", transform));
}
- final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
+ final IRVertex vertex = new OperatorVertex(
+ new WindowFnTransform(windowFn, DisplayData.from(beamNode.getTransform())));
ctx.addVertex(vertex);
beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
beamNode.getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(beamNode, vertex, output));
@@ -317,7 +323,7 @@ final class PipelineTranslator {
final IRVertex finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
ctx.addVertex(finalCombine);
final IREdge edge = new IREdge(CommunicationPatternProperty.Value.Shuffle, partialCombine, finalCombine);
- ctx.addEdgeTo(
+ ctx.addEdge(
edge,
KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
input.getWindowingStrategy().getWindowFn().windowCoder());
@@ -348,27 +354,45 @@ final class PipelineTranslator {
////////////////////////////////////////////////////////////////////////////////////////////////////////
/////////////////////// HELPER METHODS
- private static DoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
- final TransformHierarchy.Node beamNode) {
+ private static Map<Integer, PCollectionView<?>> getSideInputMap(final List<PCollectionView<?>> viewList) {
+ return IntStream.range(0, viewList.size()).boxed().collect(Collectors.toMap(Function.identity(), viewList::get));
+ }
+
+ private static AbstractDoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
+ final TransformHierarchy.Node beamNode,
+ final Map<Integer, PCollectionView<?>> sideInputMap) {
try {
final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
- final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
- return new DoFnTransform(
- doFn,
- mainInput.getCoder(),
- getOutputCoders(pTransform),
- mainOutputTag,
- additionalOutputTags.getAll(),
- mainInput.getWindowingStrategy(),
- sideInputs,
- ctx.getPipelineOptions());
+ if (sideInputMap.isEmpty()) {
+ return new DoFnTransform(
+ doFn,
+ mainInput.getCoder(),
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ additionalOutputTags.getAll(),
+ mainInput.getWindowingStrategy(),
+ ctx.getPipelineOptions(),
+ DisplayData.from(beamNode.getTransform()));
+ } else {
+ return new PushBackDoFnTransform(
+ doFn,
+ mainInput.getCoder(),
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ additionalOutputTags.getAll(),
+ mainInput.getWindowingStrategy(),
+ sideInputMap,
+ ctx.getPipelineOptions(),
+ DisplayData.from(beamNode.getTransform()));
+
+ }
} catch (final IOException e) {
throw new RuntimeException(e);
}
@@ -404,11 +428,10 @@ final class PipelineTranslator {
return new GroupByKeyAndWindowDoFnTransform(
getOutputCoders(pTransform),
mainOutputTag,
- Collections.emptyList(), /* GBK does not have additional outputs */
mainInput.getWindowingStrategy(),
- Collections.emptyList(), /* GBK does not have additional side inputs */
ctx.getPipelineOptions(),
- SystemReduceFn.buffering(mainInput.getCoder()));
+ SystemReduceFn.buffering(mainInput.getCoder()),
+ DisplayData.from(beamNode.getTransform()));
}
}
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
similarity index 54%
rename from common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
rename to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
index d7e8aa4..22f8d72 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BroadcastVariableIdProperty.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/SideInputElement.java
@@ -16,31 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.common.ir.edge.executionproperty;
-
-import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
-
-import java.io.Serializable;
+package org.apache.nemo.compiler.frontend.beam;
/**
- * Edges with this property fetch a broadcast variable.
+ * {@link org.apache.nemo.compiler.frontend.beam.transform.DoFnTransform} treats elements of this type as side inputs.
+ * TODO #289: Prevent using SideInputElement in UDFs
+ * @param <T> type of the side input value.
*/
-public final class BroadcastVariableIdProperty extends EdgeExecutionProperty<Serializable> {
+public final class SideInputElement<T> {
+ private final int sideInputIndex;
+ private final T sideInputValue;
+
+ public SideInputElement(final int sideInputIndex, final T sideInputValue) {
+ this.sideInputIndex = sideInputIndex;
+ this.sideInputValue = sideInputValue;
+ }
- /**
- * Constructor.
- * @param value id.
- */
- private BroadcastVariableIdProperty(final Serializable value) {
- super(value);
+ public int getSideInputIndex() {
+ return sideInputIndex;
}
- /**
- * Static method exposing constructor.
- * @param value id.
- * @return the newly created execution property.
- */
- public static BroadcastVariableIdProperty of(final Serializable value) {
- return new BroadcastVariableIdProperty(value);
+ public T getSideInputValue() {
+ return sideInputValue;
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
similarity index 55%
copy from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
copy to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
index dff48ee..59a1792 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/SideInputCoder.java
@@ -19,43 +19,44 @@
package org.apache.nemo.compiler.frontend.beam.coder;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
import java.io.*;
/**
- * EncoderFactory for float[].
+ * EncoderFactory for side inputs.
+ * @param <T> type of the side input value.
*/
-public final class FloatArrayCoder extends AtomicCoder<float[]> {
+public final class SideInputCoder<T> extends AtomicCoder<SideInputElement<T>> {
+ private final Coder<T> valueCoder;
+
/**
* Private constructor.
*/
- private FloatArrayCoder() {
+ private SideInputCoder(final Coder<T> valueCoder) {
+ this.valueCoder = valueCoder;
}
/**
* @return a new coder
*/
- public static FloatArrayCoder of() {
- return new FloatArrayCoder();
+ public static SideInputCoder of(final Coder valueCoder) {
+ return new SideInputCoder<>(valueCoder);
}
@Override
- public void encode(final float[] ary, final OutputStream outStream) throws IOException {
+ public void encode(final SideInputElement<T> sideInputElement, final OutputStream outStream) throws IOException {
final DataOutputStream dataOutputStream = new DataOutputStream(outStream);
- dataOutputStream.writeInt(ary.length);
- for (float f : ary) {
- dataOutputStream.writeFloat(f);
- }
+ dataOutputStream.writeInt(sideInputElement.getSideInputIndex());
+ valueCoder.encode(sideInputElement.getSideInputValue(), dataOutputStream);
}
@Override
- public float[] decode(final InputStream inStream) throws IOException {
+ public SideInputElement<T> decode(final InputStream inStream) throws IOException {
final DataInputStream dataInputStream = new DataInputStream(inStream);
- final int floatArrayLen = dataInputStream.readInt();
- final float[] floatArray = new float[floatArrayLen];
- for (int i = 0; i < floatArrayLen; i++) {
- floatArray[i] = dataInputStream.readFloat();
- }
- return floatArray;
+ final int index = dataInputStream.readInt();
+ final T value = valueCoder.decode(inStream);
+ return new SideInputElement<>(index, value);
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index bc672b7..30947fd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.beam.source;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.Readable;
@@ -42,17 +43,18 @@ import org.slf4j.LoggerFactory;
public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue<O>> {
private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
private BoundedSource<O> source;
- private final String sourceDescription;
+ private final DisplayData displayData;
/**
* Constructor of BeamBoundedSourceVertex.
*
* @param source BoundedSource to read from.
+ * @param displayData data to display.
*/
- public BeamBoundedSourceVertex(final BoundedSource<O> source) {
+ public BeamBoundedSourceVertex(final BoundedSource<O> source, final DisplayData displayData) {
super();
this.source = source;
- this.sourceDescription = source.toString();
+ this.displayData = displayData;
}
/**
@@ -63,7 +65,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
public BeamBoundedSourceVertex(final BeamBoundedSourceVertex that) {
super(that);
this.source = that.source;
- this.sourceDescription = that.source.toString();
+ this.displayData = that.displayData;
}
@Override
@@ -94,7 +96,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue
@Override
public ObjectNode getPropertiesAsJsonNode() {
final ObjectNode node = getIRVertexPropertiesAsJsonNode();
- node.put("source", sourceDescription);
+ node.put("source", displayData.toString());
return node;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
index ad40d1b..5942925 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamUnboundedSourceVertex.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.source;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.Readable;
@@ -44,22 +45,23 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
private static final Logger LOG = LoggerFactory.getLogger(BeamUnboundedSourceVertex.class.getName());
private UnboundedSource<O, M> source;
- private final String sourceDescription;
+ private final DisplayData displayData;
/**
* The default constructor for beam unbounded source.
* @param source unbounded source.
*/
- public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source) {
+ public BeamUnboundedSourceVertex(final UnboundedSource<O, M> source,
+ final DisplayData displayData) {
super();
this.source = source;
- this.sourceDescription = source.toString();
+ this.displayData = displayData;
}
private BeamUnboundedSourceVertex(final BeamUnboundedSourceVertex<O, M> that) {
super(that);
this.source = that.source;
- this.sourceDescription = that.source.toString();
+ this.displayData = that.displayData;
}
@Override
@@ -88,7 +90,7 @@ public final class BeamUnboundedSourceVertex<O, M extends UnboundedSource.Checkp
@Override
public ObjectNode getPropertiesAsJsonNode() {
final ObjectNode node = getIRVertexPropertiesAsJsonNode();
- node.put("source", sourceDescription);
+ node.put("source", displayData.toString());
return node;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index dd5ca35..72113d6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
@@ -31,11 +32,12 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -52,7 +54,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> additionalOutputTags;
- private final Collection<PCollectionView<?>> sideInputs;
+ private final Map<Integer, PCollectionView<?>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
private final DoFn<InterT, OutputT> doFn;
private final SerializablePipelineOptions serializedOptions;
@@ -61,9 +63,13 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
private transient DoFnRunner<InterT, OutputT> doFnRunner;
- private transient SideInputReader sideInputReader;
+
+ // null when there is no side input.
+ private transient PushbackSideInputDoFnRunner<InterT, OutputT> pushBackRunner;
+
private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
private transient DoFnRunners.OutputManager outputManager;
+ private transient InMemorySideInputReader sideInputReader;
// Variables for bundle.
// We consider count and time millis for start/finish bundle.
@@ -74,6 +80,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
private long prevBundleStartTime;
private long currBundleCount = 0;
private boolean bundleFinished = true;
+ private final DisplayData displayData;
/**
* AbstractDoFnTransform constructor.
@@ -85,6 +92,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
* @param windowingStrategy windowing strategy
* @param sideInputs side inputs
* @param options pipeline options
+ * @param displayData display data.
*/
public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
final Coder<InputT> inputCoder,
@@ -92,8 +100,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
final TupleTag<OutputT> mainOutputTag,
final List<TupleTag<?>> additionalOutputTags,
final WindowingStrategy<?, ?> windowingStrategy,
- final Collection<PCollectionView<?>> sideInputs,
- final PipelineOptions options) {
+ final Map<Integer, PCollectionView<?>> sideInputs,
+ final PipelineOptions options,
+ final DisplayData displayData) {
this.doFn = doFn;
this.inputCoder = inputCoder;
this.outputCoders = outputCoders;
@@ -102,28 +111,37 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
this.sideInputs = sideInputs;
this.serializedOptions = new SerializablePipelineOptions(options);
this.windowingStrategy = windowingStrategy;
+ this.displayData = displayData;
}
- protected final DoFnRunners.OutputManager getOutputManager() {
- return outputManager;
+ final Map<Integer, PCollectionView<?>> getSideInputs() {
+ return sideInputs;
}
- protected final WindowingStrategy getWindowingStrategy() {
- return windowingStrategy;
+ final DoFnRunners.OutputManager getOutputManager() {
+ return outputManager;
}
- protected final SideInputReader getSideInputReader() {
- return sideInputReader;
+ final WindowingStrategy getWindowingStrategy() {
+ return windowingStrategy;
}
- protected final TupleTag<OutputT> getMainOutputTag() {
+ final TupleTag<OutputT> getMainOutputTag() {
return mainOutputTag;
}
- protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+ final DoFnRunner<InterT, OutputT> getDoFnRunner() {
return doFnRunner;
}
+ final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
+ return pushBackRunner;
+ }
+
+ final InMemorySideInputReader getSideInputReader() {
+ return sideInputReader;
+ }
+
public final DoFn getDoFn() {
return doFn;
}
@@ -131,26 +149,51 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
/**
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
+ *
+ * TODO #263: Partial Combining for Beam Streaming
+ * We may want to use separate methods for doFnRunner/pushBackRunner
+ * (same applies to the other bundle-related methods)
*/
- protected final void checkAndInvokeBundle() {
+ final void checkAndInvokeBundle() {
if (bundleFinished) {
bundleFinished = false;
- doFnRunner.startBundle();
+ if (pushBackRunner == null) {
+ doFnRunner.startBundle();
+ } else {
+ pushBackRunner.startBundle();
+ }
prevBundleStartTime = System.currentTimeMillis();
currBundleCount = 0;
}
currBundleCount += 1;
}
-
/**
* Checks whether it is time to finish the bundle and finish it.
*/
- protected final void checkAndFinishBundle() {
+ final void checkAndFinishBundle() {
if (!bundleFinished) {
if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
bundleFinished = true;
+ if (pushBackRunner == null) {
+ doFnRunner.finishBundle();
+ } else {
+ pushBackRunner.finishBundle();
+ }
+ }
+ }
+ }
+
+ /**
+ * Finish bundle without checking for conditions.
+ */
+ final void forceFinishBundle() {
+ if (!bundleFinished) {
+ bundleFinished = true;
+ if (pushBackRunner == null) {
doFnRunner.finishBundle();
+ } else {
+ pushBackRunner.finishBundle();
}
}
}
@@ -168,11 +211,7 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
// create side input reader
- if (!sideInputs.isEmpty()) {
- sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
- } else {
- sideInputReader = NullSideInputReader.of(sideInputs);
- }
+ sideInputReader = new InMemorySideInputReader(new ArrayList<>(sideInputs.values()));
// this transform does not support state and timer.
final StepContext stepContext = new StepContext() {
@@ -205,6 +244,10 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
inputCoder,
outputCoders,
windowingStrategy);
+
+ pushBackRunner = sideInputs.isEmpty()
+ ? null
+ : SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create(doFnRunner, sideInputs.values(), sideInputReader);
}
public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
@@ -214,12 +257,15 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
@Override
public final void close() {
beforeClose();
- if (!bundleFinished) {
- doFnRunner.finishBundle();
- }
+ forceFinishBundle();
doFnInvoker.invokeTeardown();
}
+ @Override
+ public final String toString() {
+ return this.getClass().getSimpleName() + " / " + displayData.toString().replace(":", " / ");
+ }
+
/**
* An abstract function that wraps the original doFn.
* @param originalDoFn the original doFn.
@@ -234,9 +280,6 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
*/
abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
- @Override
- public abstract void onData(final WindowedValue<InputT> data);
-
/**
* An abstract function that is called before close.
*/
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
deleted file mode 100644
index 64460f9..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-
-import javax.annotation.Nullable;
-import java.util.Collection;
-
-/**
- * A sideinput reader that reads/writes side input values to context.
- */
-public final class BroadcastVariableSideInputReader implements SideInputReader {
-
- // Nemo context for storing/getting side inputs
- private final Transform.Context context;
-
- // The list of side inputs that we're handling
- private final Collection<PCollectionView<?>> sideInputs;
-
- BroadcastVariableSideInputReader(final Transform.Context context,
- final Collection<PCollectionView<?>> sideInputs) {
- this.context = context;
- this.sideInputs = sideInputs;
- }
-
- @Nullable
- @Override
- public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
- // TODO #216: implement side input and windowing
- return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
- }
-
- @Override
- public <T> boolean contains(final PCollectionView<T> view) {
- return sideInputs.contains(view);
- }
-
- @Override
- public boolean isEmpty() {
- return sideInputs.isEmpty();
- }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index 05e5af6..03313c4 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -37,14 +37,12 @@ import java.util.*;
* @param <I> input type
* @param <O> materialized output type
*/
-public final class CreateViewTransform<I, O> implements
- Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
- private OutputCollector<WindowedValue<O>> outputCollector;
+public final class CreateViewTransform<I, O> implements Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
private final Map<BoundedWindow, List<I>> windowListMap;
- // TODO #259: we can remove this variable by implementing ReadyCheckingSideInputReader
- private boolean isEmitted = false;
+ private OutputCollector<WindowedValue<O>> outputCollector;
+
private long currentOutputWatermark;
/**
@@ -75,7 +73,6 @@ public final class CreateViewTransform<I, O> implements
@Override
public void onWatermark(final Watermark inputWatermark) {
-
// If no data, just forwards the watermark
if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) {
currentOutputWatermark = inputWatermark.getTimestamp();
@@ -90,11 +87,10 @@ public final class CreateViewTransform<I, O> implements
final Map.Entry<BoundedWindow, List<I>> entry = iterator.next();
if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) {
// emit the windowed data if the watermark timestamp > the window max boundary
- final O view = viewFn.apply(new MultiView<>(entry.getValue()));
+ final O output = viewFn.apply(new MultiView<>(entry.getValue()));
outputCollector.emit(WindowedValue.of(
- view, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ output, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
iterator.remove();
- isEmitted = true;
minOutputTimestampOfEmittedWindows =
Math.min(minOutputTimestampOfEmittedWindows, entry.getKey().maxTimestamp().getMillis());
@@ -112,20 +108,12 @@ public final class CreateViewTransform<I, O> implements
@Override
public void close() {
onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
- if (!isEmitted) {
- // TODO #259: This is an ad-hoc code to resolve the view that has no data
- // Currently, broadCastWorker reads the view data, but it throws exception if no data is available for a view.
- // We should use watermark value to track whether the materialized data in a view is available or not.
- final O view = viewFn.apply(new MultiView<>(Collections.emptyList()));
- outputCollector.emit(WindowedValue.valueInGlobalWindow(view));
- }
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append("CreateViewTransform:" + viewFn);
+ sb.append("CreateViewTransform " + viewFn.getClass().getName());
return sb.toString();
}
@@ -133,13 +121,13 @@ public final class CreateViewTransform<I, O> implements
* Represents {@code PrimitiveViewT} supplied to the {@link ViewFn}.
* @param <T> primitive view type
*/
- public final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
+ public static final class MultiView<T> implements Materializations.MultimapView<Void, T>, Serializable {
private final Iterable<T> iterable;
/**
* Constructor.
*/
- MultiView(final Iterable<T> iterable) {
+ public MultiView(final Iterable<T> iterable) {
// Create a placeholder for side input data. CreateViewTransform#onData stores data to this list.
this.iterable = iterable;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 9f7a4e0..699a0dd 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -21,8 +21,8 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
@@ -30,12 +30,12 @@ import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
- * DoFn transform implementation.
+ * DoFn transform implementation when there is no side input.
*
* @param <InputT> input type.
* @param <OutputT> output type.
@@ -45,9 +45,6 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
/**
* DoFnTransform Constructor.
- *
- * @param doFn doFn.
- * @param options Pipeline options.
*/
public DoFnTransform(final DoFn<InputT, OutputT> doFn,
final Coder<InputT> inputCoder,
@@ -55,10 +52,10 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
final TupleTag<OutputT> mainOutputTag,
final List<TupleTag<?>> additionalOutputTags,
final WindowingStrategy<?, ?> windowingStrategy,
- final Collection<PCollectionView<?>> sideInputs,
- final PipelineOptions options) {
+ final PipelineOptions options,
+ final DisplayData displayData) {
super(doFn, inputCoder, outputCoders, mainOutputTag,
- additionalOutputTags, windowingStrategy, sideInputs, options);
+ additionalOutputTags, windowingStrategy, Collections.emptyMap(), options, displayData);
}
@Override
@@ -68,6 +65,7 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
@Override
public void onData(final WindowedValue<InputT> data) {
+ // Do not need any push-back logic.
checkAndInvokeBundle();
getDoFnRunner().processElement(data);
checkAndFinishBundle();
@@ -76,26 +74,16 @@ public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<
@Override
public void onWatermark(final Watermark watermark) {
checkAndInvokeBundle();
- // TODO #216: We should consider push-back data that waits for side input
- // TODO #216: If there are push-back data, input watermark >= output watermark
getOutputCollector().emitWatermark(watermark);
checkAndFinishBundle();
}
@Override
protected void beforeClose() {
- // nothing
}
@Override
OutputCollector wrapOutputCollector(final OutputCollector oc) {
return oc;
}
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("DoTransform:" + getDoFn());
- return sb.toString();
- }
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
index 84b6835..06dde58 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -22,9 +22,9 @@ import org.apache.beam.runners.core.*;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
@@ -57,19 +57,19 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
*/
public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, Iterable<InputT>>> mainOutputTag,
- final List<TupleTag<?>> additionalOutputTags,
final WindowingStrategy<?, ?> windowingStrategy,
- final Collection<PCollectionView<?>> sideInputs,
final PipelineOptions options,
- final SystemReduceFn reduceFn) {
+ final SystemReduceFn reduceFn,
+ final DisplayData displayData) {
super(null, /* doFn */
null, /* inputCoder */
outputCoders,
mainOutputTag,
- additionalOutputTags,
+ Collections.emptyList(), /* GBK does not have additional outputs */
windowingStrategy,
- sideInputs,
- options);
+ Collections.emptyMap(), /* GBK does not have additional side inputs */
+ options,
+ displayData);
this.keyToValues = new HashMap<>();
this.reduceFn = reduceFn;
this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
@@ -93,7 +93,7 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
getWindowingStrategy(),
inMemoryStateInternalsFactory,
inMemoryTimerInternalsFactory,
- getSideInputReader(),
+ null, // GBK has no sideinput.
reduceFn,
getOutputManager(),
getMainOutputTag());
@@ -163,23 +163,19 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
* @param inputWatermark input watermark
*/
private void emitOutputWatermark(final Watermark inputWatermark) {
-
- if (keyAndWatermarkHoldMap.isEmpty()) {
- return;
- }
-
// Find min watermark hold
- final Watermark minWatermarkHold = Collections.min(keyAndWatermarkHoldMap.values());
+ final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
+ ? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
+ : Collections.min(keyAndWatermarkHoldMap.values());
+ final Watermark outputWatermarkCandidate = new Watermark(
+ Math.max(prevOutputWatermark.getTimestamp(),
+ Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
if (LOG.isDebugEnabled()) {
LOG.debug("Watermark hold: {}, "
+ "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
}
- final Watermark outputWatermarkCandidate = new Watermark(
- Math.max(prevOutputWatermark.getTimestamp(),
- Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
-
if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
// progress!
prevOutputWatermark = outputWatermarkCandidate;
@@ -211,8 +207,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
// Finish any pending windows by advancing the input watermark to infinity.
processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
- // Emit watermark to downstream operators
- emitOutputWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
}
/**
@@ -258,13 +252,6 @@ public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
}
}
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("GroupByKeyAndWindowDoFnTransform:");
- return sb.toString();
- }
-
/**
* Get timer data.
*/
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 0f4cf5b..71c68ea 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -69,12 +69,4 @@ public final class GroupByKeyTransform<I> extends NoWatermarkEmitTransform<I, Wi
}
}
}
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("GroupByKeyTransform:");
- sb.append(super.toString());
- return sb.toString();
- }
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
new file mode 100644
index 0000000..d8f0d8f
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/PushBackDoFnTransform.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation with push backs for side inputs.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class PushBackDoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
+ private static final Logger LOG = LoggerFactory.getLogger(PushBackDoFnTransform.class.getName());
+
+ private List<WindowedValue<InputT>> curPushedBacks;
+ private long curPushedBackWatermark; // Long.MAX_VALUE when no pushed-back exists.
+ private long curInputWatermark;
+ private long curOutputWatermark;
+
+ /**
+ * PushBackDoFnTransform Constructor.
+ */
+ public PushBackDoFnTransform(final DoFn<InputT, OutputT> doFn,
+ final Coder<InputT> inputCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<OutputT> mainOutputTag,
+ final List<TupleTag<?>> additionalOutputTags,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final Map<Integer, PCollectionView<?>> sideInputs,
+ final PipelineOptions options,
+ final DisplayData displayData) {
+ super(doFn, inputCoder, outputCoders, mainOutputTag,
+ additionalOutputTags, windowingStrategy, sideInputs, options, displayData);
+ this.curPushedBacks = new ArrayList<>();
+ this.curPushedBackWatermark = Long.MAX_VALUE;
+ this.curInputWatermark = Long.MIN_VALUE;
+ this.curOutputWatermark = Long.MIN_VALUE;
+ }
+
+ @Override
+ protected DoFn wrapDoFn(final DoFn initDoFn) {
+ return initDoFn;
+ }
+
+ @Override
+ public void onData(final WindowedValue data) {
+ // Need to distinguish side/main inputs and push-back main inputs.
+ if (data.getValue() instanceof SideInputElement) {
+ // This element is a Side Input
+ // TODO #287: Consider Explicit Multi-Input IR Transform
+ final WindowedValue<SideInputElement> sideInputElement = (WindowedValue<SideInputElement>) data;
+ final PCollectionView view = getSideInputs().get(sideInputElement.getValue().getSideInputIndex());
+ getSideInputReader().addSideInputElement(view, data);
+
+ handlePushBacks();
+
+ // See if we can emit a new watermark, as we may have processed some pushed-back elements
+ onWatermark(new Watermark(curInputWatermark));
+ } else {
+ // This element is the Main Input
+ checkAndInvokeBundle();
+ final Iterable<WindowedValue<InputT>> pushedBack =
+ getPushBackRunner().processElementInReadyWindows(data);
+ for (final WindowedValue wv : pushedBack) {
+ curPushedBackWatermark = Math.min(curPushedBackWatermark, wv.getTimestamp().getMillis());
+ curPushedBacks.add(wv);
+ }
+ checkAndFinishBundle();
+ }
+ }
+
+ private void handlePushBacks() {
+ // Force-finish, before (possibly) processing pushed-back data.
+ //
+ // Main reason:
+ // {@link org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner}
+ // caches for each bundle the side inputs that are not ready.
+ // We need to re-start the bundle to advertise the (possibly) newly available side input.
+ forceFinishBundle(); // forced
+
+ // With the new side input added, we may be able to process some pushed-back elements.
+ final List<WindowedValue<InputT>> pushedBackAgain = new ArrayList<>();
+ long pushedBackAgainWatermark = Long.MAX_VALUE;
+ for (final WindowedValue<InputT> curPushedBack : curPushedBacks) {
+ checkAndInvokeBundle();
+ final Iterable<WindowedValue<InputT>> pushedBack =
+ getPushBackRunner().processElementInReadyWindows(curPushedBack);
+ checkAndFinishBundle();
+ for (final WindowedValue<InputT> wv : pushedBack) {
+ pushedBackAgainWatermark = Math.min(pushedBackAgainWatermark, wv.getTimestamp().getMillis());
+ pushedBackAgain.add(wv);
+ }
+ }
+ curPushedBacks = pushedBackAgain;
+ curPushedBackWatermark = pushedBackAgainWatermark;
+ }
+
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ // TODO #298: Consider Processing DoFn PushBacks on Watermark
+ checkAndInvokeBundle();
+ curInputWatermark = watermark.getTimestamp();
+ getSideInputReader().setCurrentWatermarkOfAllMainAndSideInputs(curInputWatermark);
+
+ final long outputWatermarkCandidate = Math.min(curInputWatermark, curPushedBackWatermark);
+ if (outputWatermarkCandidate > curOutputWatermark) {
+ // Watermark advances!
+ getOutputCollector().emitWatermark(new Watermark(outputWatermarkCandidate));
+ curOutputWatermark = outputWatermarkCandidate;
+ }
+ checkAndFinishBundle();
+ }
+
+ @Override
+ protected void beforeClose() {
+ // This makes all unavailable side inputs as available empty side inputs.
+ onWatermark(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ // All push-backs should be processed here.
+ handlePushBacks();
+ }
+
+ @Override
+ OutputCollector wrapOutputCollector(final OutputCollector oc) {
+ return oc;
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
new file mode 100644
index 0000000..b1536e6
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
+
+/**
+ * Side input transform implementation.
+ * TODO #297: Consider Removing SideInputTransform
+ * @param <T> input/output type.
+ */
+public final class SideInputTransform<T> implements Transform<WindowedValue<T>, WindowedValue<SideInputElement<T>>> {
+ private OutputCollector<WindowedValue<SideInputElement<T>>> outputCollector;
+ private final int index;
+
+ /**
+ * Constructor.
+ */
+ public SideInputTransform(final int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<WindowedValue<SideInputElement<T>>> oc) {
+ this.outputCollector = oc;
+ }
+
+ @Override
+ public void onData(final WindowedValue<T> element) {
+ outputCollector.emit(element.withValue(new SideInputElement<>(index, element.getValue())));
+ }
+
+ @Override
+ public void onWatermark(final Watermark watermark) {
+ outputCollector.emitWatermark(watermark);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("SideInputTransform:");
+ sb.append("(index-");
+ sb.append(String.valueOf(index));
+ sb.append(")");
+ sb.append(super.toString());
+ return sb.toString();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index a434618..f8f5c9f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -19,6 +19,7 @@
package org.apache.nemo.compiler.frontend.beam.transform;
import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -40,14 +41,16 @@ import java.util.Collection;
public final class WindowFnTransform<T, W extends BoundedWindow>
implements Transform<WindowedValue<T>, WindowedValue<T>> {
private final WindowFn windowFn;
+ private final DisplayData displayData;
private OutputCollector<WindowedValue<T>> outputCollector;
/**
* Default Constructor.
* @param windowFn windowFn for the Transform.
*/
- public WindowFnTransform(final WindowFn windowFn) {
+ public WindowFnTransform(final WindowFn windowFn, final DisplayData displayData) {
this.windowFn = windowFn;
+ this.displayData = displayData;
}
@Override
@@ -101,7 +104,7 @@ public final class WindowFnTransform<T, W extends BoundedWindow>
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
- sb.append("WindowFnTransform:" + windowFn);
+ sb.append("WindowFnTransform / " + displayData.toString().replaceAll(":", " / "));
return sb.toString();
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
index be3f008..278c83c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendALSTest.java
@@ -41,7 +41,7 @@ public final class BeamFrontendALSTest {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileALSDAG();
assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
- assertEquals(38, producedDAG.getVertices().size());
+ assertEquals(44, producedDAG.getVertices().size());
// producedDAG.getTopologicalSort().forEach(v -> System.out.println(v.getId()));
final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
index 632c30a..c6f100d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/BeamFrontendMLRTest.java
@@ -41,7 +41,7 @@ public class BeamFrontendMLRTest {
final DAG<IRVertex, IREdge> producedDAG = CompilerTestUtil.compileMLRDAG();
assertEquals(producedDAG.getTopologicalSort(), producedDAG.getTopologicalSort());
- assertEquals(36, producedDAG.getVertices().size());
+ assertEquals(39, producedDAG.getVertices().size());
final IRVertex vertexX = producedDAG.getTopologicalSort().get(5);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexX).size());
@@ -51,6 +51,6 @@ public class BeamFrontendMLRTest {
final IRVertex vertexY = producedDAG.getTopologicalSort().get(13);
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY).size());
assertEquals(1, producedDAG.getIncomingEdgesOf(vertexY.getId()).size());
- assertEquals(2, producedDAG.getOutgoingEdgesOf(vertexY).size());
+ assertEquals(1, producedDAG.getOutgoingEdgesOf(vertexY).size());
}
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
index 762e327..702ca9d 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransformTest.java
@@ -21,6 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.sdk.transforms.Materialization;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.ViewFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -54,6 +55,7 @@ public final class CreateViewTransformTest {
public void test() {
final FixedWindows fixedwindows = FixedWindows.of(Duration.standardSeconds(1));
+
final CreateViewTransform<String, Integer> viewTransform =
new CreateViewTransform(new SumViewFn());
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index bad7584..fa1169c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -20,14 +20,14 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -36,17 +36,17 @@ import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.SideInputElement;
import org.apache.reef.io.Tuple;
import org.junit.Before;
import org.junit.Test;
import java.util.*;
-import static java.util.Collections.emptyList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public final class DoFnTransformTest {
@@ -78,8 +78,8 @@ public final class DoFnTransformTest {
outputTag,
Collections.emptyList(),
WindowingStrategy.globalDefault(),
- emptyList(), /* side inputs */
- PipelineOptionsFactory.as(NemoPipelineOptions.class));
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ DisplayData.none());
final Transform.Context context = mock(Transform.Context.class);
final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -112,8 +112,8 @@ public final class DoFnTransformTest {
outputTag,
Collections.emptyList(),
WindowingStrategy.globalDefault(),
- emptyList(), /* side inputs */
- pipelineOptions);
+ pipelineOptions,
+ DisplayData.none());
final Transform.Context context = mock(Transform.Context.class);
final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -156,8 +156,8 @@ public final class DoFnTransformTest {
outputTag,
Collections.emptyList(),
WindowingStrategy.globalDefault(),
- emptyList(), /* side inputs */
- pipelineOptions);
+ pipelineOptions,
+ DisplayData.none());
final Transform.Context context = mock(Transform.Context.class);
final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
@@ -208,8 +208,8 @@ public final class DoFnTransformTest {
mainOutput,
tags,
WindowingStrategy.globalDefault(),
- emptyList(), /* side inputs */
- PipelineOptionsFactory.as(NemoPipelineOptions.class));
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ DisplayData.none());
// mock context
final Transform.Context context = mock(Transform.Context.class);
@@ -244,48 +244,70 @@ public final class DoFnTransformTest {
doFnTransform.close();
}
- // TODO #216: implement side input and windowing
@Test
public void testSideInputs() {
// mock context
final Transform.Context context = mock(Transform.Context.class);
- when(context.getBroadcastVariable(view1)).thenReturn(
- WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
- when(context.getBroadcastVariable(view2)).thenReturn(
- WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
-
TupleTag<Tuple<String, Iterable<String>>> outputTag = new TupleTag<>("main-output");
- WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
- WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+ WindowedValue<String> firstElement = WindowedValue.valueInGlobalWindow("first");
+ WindowedValue<String> secondElement = WindowedValue.valueInGlobalWindow("second");
- final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
- ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+ SideInputElement firstSideinput = new SideInputElement<>(0, ImmutableList.of("1"));
+ SideInputElement secondSideinput = new SideInputElement(1, ImmutableList.of("2"));
- final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform =
- new DoFnTransform<>(
- new SimpleSideInputDoFn<>(eventAndViewMap),
+ final Map<Integer, PCollectionView<?>> sideInputMap = new HashMap<>();
+ sideInputMap.put(firstSideinput.getSideInputIndex(), view1);
+ sideInputMap.put(secondSideinput.getSideInputIndex(), view2);
+ final PushBackDoFnTransform<String, String> doFnTransform =
+ new PushBackDoFnTransform(
+ new SimpleSideInputDoFn<String>(view1, view2),
NULL_INPUT_CODER,
NULL_OUTPUT_CODERS,
outputTag,
Collections.emptyList(),
WindowingStrategy.globalDefault(),
- ImmutableList.of(view1, view2), /* side inputs */
- PipelineOptionsFactory.as(NemoPipelineOptions.class));
+ sideInputMap, /* side inputs */
+ PipelineOptionsFactory.as(NemoPipelineOptions.class),
+ DisplayData.none());
- final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = new TestOutputCollector<>();
+ final TestOutputCollector<String> oc = new TestOutputCollector<>();
doFnTransform.prepare(context, oc);
- doFnTransform.onData(first);
- doFnTransform.onData(second);
-
- assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))),
- ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(0));
-
- assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))),
- ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(1));
-
+ // Main input first, Side inputs later
+ doFnTransform.onData(firstElement);
+
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow(firstSideinput));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow(secondSideinput));
+ assertEquals(
+ WindowedValue.valueInGlobalWindow(
+ concat(firstElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())),
+ oc.getOutput().get(0));
+
+ // Side inputs first, Main input later
+ doFnTransform.onData(secondElement);
+ assertEquals(
+ WindowedValue.valueInGlobalWindow(
+ concat(secondElement.getValue(), firstSideinput.getSideInputValue(), secondSideinput.getSideInputValue())),
+ oc.getOutput().get(1));
+
+ // There should be only 2 final outputs
+ assertEquals(2, oc.getOutput().size());
+
+ // The side inputs should be "READY"
+ assertTrue(doFnTransform.getSideInputReader().isReady(view1, GlobalWindow.INSTANCE));
+ assertTrue(doFnTransform.getSideInputReader().isReady(view2, GlobalWindow.INSTANCE));
+
+ // This watermark should remove the side inputs. (Now should be "NOT READY")
+ doFnTransform.onWatermark(new Watermark(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+ Iterable materializedSideInput1 = doFnTransform.getSideInputReader().get(view1, GlobalWindow.INSTANCE);
+ Iterable materializedSideInput2 = doFnTransform.getSideInputReader().get(view2, GlobalWindow.INSTANCE);
+ assertFalse(materializedSideInput1.iterator().hasNext());
+ assertFalse(materializedSideInput2.iterator().hasNext());
+
+ // There should be only 2 final outputs
doFnTransform.close();
+ assertEquals(2, oc.getOutput().size());
}
@@ -334,21 +356,30 @@ public final class DoFnTransformTest {
* Side input do fn.
* @param <T> type
*/
- private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
- private final Map<T, PCollectionView<V>> idAndViewMap;
-
- public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
- this.idAndViewMap = idAndViewMap;
+ private static class SimpleSideInputDoFn<T> extends DoFn<T, String> {
+ private final PCollectionView<?> view1;
+ private final PCollectionView<?> view2;
+
+ public SimpleSideInputDoFn(final PCollectionView<?> view1,
+ final PCollectionView<?> view2) {
+ this.view1 = view1;
+ this.view2 = view2;
}
@ProcessElement
public void processElement(final ProcessContext c) throws Exception {
- final PCollectionView<V> view = idAndViewMap.get(c.element());
- final V sideInput = c.sideInput(view);
- c.output(new Tuple<>(c.element(), sideInput));
+ final T element = c.element();
+ final Object view1Value = c.sideInput(view1);
+ final Object view2Value = c.sideInput(view2);
+
+ c.output(concat(element, view1Value, view2Value));
}
}
+ private static String concat(final Object obj1, final Object obj2, final Object obj3) {
+ return obj1.toString() + " / " + obj2 + " / " + obj3;
+ }
+
/**
* Multi output do fn.
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index f9a44ec..474c79c 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -21,6 +21,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -89,11 +90,10 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
new GroupByKeyAndWindowDoFnTransform(
NULL_OUTPUT_CODERS,
outputTag,
- Collections.emptyList(), /* additional outputs */
WindowingStrategy.of(slidingWindows),
- emptyList(), /* side inputs */
PipelineOptionsFactory.as(NemoPipelineOptions.class),
- SystemReduceFn.buffering(NULL_INPUT_CODER));
+ SystemReduceFn.buffering(NULL_INPUT_CODER),
+ DisplayData.none());
final Instant ts1 = new Instant(1);
final Instant ts2 = new Instant(100);
@@ -167,10 +167,17 @@ public final class GroupByKeyAndWindowDoFnTransformTest {
doFnTransform.onData(WindowedValue.of(
KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
- // do not emit anything
+
doFnTransform.onWatermark(watermark2);
- assertEquals(0, oc.outputs.size());
- assertEquals(0, oc.watermarks.size());
+
+ assertEquals(0, oc.outputs.size()); // do not emit anything
+ assertEquals(1, oc.watermarks.size());
+
+ // check output watermark
+ assertEquals(1400,
+ oc.watermarks.get(0).getTimestamp());
+
+ oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
index bdc2a85..e2f2c0a 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopInvariantCodeMotionPassTest.java
@@ -63,7 +63,7 @@ public class LoopInvariantCodeMotionPassTest {
final LoopVertex alsLoop = alsLoopOpt.get();
final IRVertex vertex7 = groupedDAG.getTopologicalSort().get(3);
- final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(4);
+ final IRVertex vertex15 = alsLoop.getDAG().getTopologicalSort().get(5);
final Set<IREdge> oldDAGIncomingEdges = alsLoop.getDagIncomingEdges().get(vertex15);
final List<IREdge> newDAGIncomingEdge = groupedDAG.getIncomingEdgesOf(vertex7);
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
index 84da0dd..76bdc03 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquare.java
@@ -21,8 +21,6 @@ package org.apache.nemo.examples.beam;
import com.github.fommil.netlib.BLAS;
import com.github.fommil.netlib.LAPACK;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderProviders;
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
index e431019..ab1760f 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/AlternatingLeastSquareInefficient.java
@@ -18,8 +18,6 @@
*/
package org.apache.nemo.examples.beam;
-import org.apache.nemo.compiler.frontend.beam.coder.FloatArrayCoder;
-import org.apache.nemo.compiler.frontend.beam.coder.IntArrayCoder;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
import org.apache.beam.sdk.Pipeline;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
index dff48ee..104e994 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/FloatArrayCoder.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/FloatArrayCoder.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.coders.AtomicCoder;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
similarity index 97%
rename from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
rename to examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
index ac9205b..f720137 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/IntArrayCoder.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/IntArrayCoder.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.compiler.frontend.beam.coder;
+package org.apache.nemo.examples.beam;
import org.apache.beam.sdk.coders.AtomicCoder;
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
new file mode 100644
index 0000000..30ee405
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedBroadcast.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedBroadcast {
+ /**
+ * Private Constructor.
+ */
+ private WindowedBroadcast() {
+ }
+
+ private static PCollection<Long> getSource(final Pipeline p) {
+ return p.apply(GenerateSequence
+ .from(1)
+ .withRate(2, Duration.standardSeconds(1))
+ .withTimestampFn(num -> new Instant(num * 500))); // 0.5 second between subsequent elements
+ }
+ /**
+ * Main function for the MR BEAM program.
+ * @param args arguments.
+ */
+ public static void main(final String[] args) {
+ final String outputFilePath = args[0];
+
+ final Window<Long> windowFn = Window
+ .<Long>into(SlidingWindows.of(Duration.standardSeconds(2))
+ .every(Duration.standardSeconds(1)));
+
+ final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+ options.setRunner(NemoPipelineRunner.class);
+ options.setJobName("WindowedBroadcast");
+
+ final Pipeline p = Pipeline.create(options);
+
+ final PCollection<Long> windowedElements = getSource(p).apply(windowFn);
+ final PCollectionView<List<Long>> windowedView = windowedElements.apply(View.asList());
+
+ windowedElements.apply(ParDo.of(new DoFn<Long, String>() {
+ @ProcessElement
+ public void processElement(final ProcessContext c) {
+ final Long anElementInTheWindow = c.element();
+ final List<Long> allElementsInTheWindow = c.sideInput(windowedView);
+ System.out.println(anElementInTheWindow + " / " + allElementsInTheWindow);
+ if (!allElementsInTheWindow.contains(anElementInTheWindow)) {
+ throw new RuntimeException(anElementInTheWindow + " not in " + allElementsInTheWindow.toString());
+ } else {
+ c.output(anElementInTheWindow + " is in " + allElementsInTheWindow);
+ }
+ }
+ }).withSideInputs(windowedView)
+ ).apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
+ p.run();
+ }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index d7f8c85..0f13dc4 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -78,7 +78,7 @@ public final class WindowedWordCount {
return p.apply(GenerateSequence
.from(1)
.withRate(2, Duration.standardSeconds(1))
- .withTimestampFn(num -> new Instant(num * 500)))
+ .withTimestampFn(num -> new Instant(num * 500))) // 0.5 second between subsequent elements
.apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
@Override
public KV<String, Long> apply(final Long val) {
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
new file mode 100644
index 0000000..5e2fba3
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedBroadcastITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.examples.beam.policy.StreamingPolicyParallelismFive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ * TODO #291: ITCase for Empty PCollectionViews
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedBroadcastITCase {
+
+ private static final int TIMEOUT = 120000;
+ private static ArgBuilder builder;
+ private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+ private static final String outputFileName = "test_output_windowed_broadcast";
+ private static final String expectedOutputFileName = "expected_output_windowed_broadcast";
+ private static final String expectedSlidingWindowOutputFileName = "expected_output_sliding_windowed_broadcast";
+ private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+ private static final String outputFilePath = fileBasePath + outputFileName;
+
+ // TODO #271: We currently disable this test because we cannot force close Nemo
+ // @Test (timeout = TIMEOUT)
+ public void testUnboundedSlidingWindow() throws Exception {
+ builder = new ArgBuilder()
+ .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+ .addUserMain(WindowedBroadcast.class.getCanonicalName())
+ .addUserArgs(outputFilePath);
+
+ JobLauncher.main(builder
+ .addResourceJson(executorResourceFileName)
+ .addJobId(WindowedBroadcastITCase.class.getSimpleName())
+ .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+ .build());
+
+ try {
+ ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+ } finally {
+ }
+ }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index c0134aa..7ef0e02 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -33,6 +33,7 @@ import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_UNBOUND
/**
* Test Windowed word count program with JobLauncher.
+ * TODO #299: WindowedWordCountITCase Hangs (Heisenbug)
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(JobLauncher.class)
@@ -58,7 +59,7 @@ public final class WindowedWordCountITCase {
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
- .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchFixedWindow")
.addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
.build());
@@ -78,7 +79,7 @@ public final class WindowedWordCountITCase {
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
- .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testBatchSlidingWindow")
.addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
.build());
@@ -98,7 +99,7 @@ public final class WindowedWordCountITCase {
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
- .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeFixedWindow")
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
.build());
@@ -119,7 +120,7 @@ public final class WindowedWordCountITCase {
JobLauncher.main(builder
.addResourceJson(executorResourceFileName)
- .addJobId(WindowedWordCountITCase.class.getSimpleName())
+ .addJobId(WindowedWordCountITCase.class.getSimpleName() + "testStreamingSchedulerAndPipeSlidingWindow")
.addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
.build());
@@ -132,7 +133,7 @@ public final class WindowedWordCountITCase {
// TODO #271: We currently disable this test because we cannot force close Nemo
- //@Test (timeout = TIMEOUT)
+ // @Test (timeout = TIMEOUT)
public void testUnboundedSlidingWindow() throws Exception {
builder = new ArgBuilder()
.addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index f7aa184..f972ce0 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -120,7 +120,7 @@ public final class Executor {
* @param task to launch.
*/
private void launchTask(final Task task) {
- LOG.info("Launch task: {}", task);
+ LOG.info("Launch task: {}", task.getTaskId());
try {
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
SerializationUtils.deserialize(task.getSerializedIRDag());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
index 17a62c5..42806b7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
@@ -27,7 +27,6 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
import net.jcip.annotations.ThreadSafe;
import org.apache.commons.lang.SerializationUtils;
import org.apache.reef.tang.annotations.Parameter;
@@ -36,9 +35,7 @@ import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import java.io.Serializable;
-import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -50,7 +47,6 @@ public final class BroadcastManagerWorker {
private static final Logger LOG = LoggerFactory.getLogger(BroadcastManagerWorker.class.getName());
private static BroadcastManagerWorker staticReference;
- private final ConcurrentHashMap<Serializable, InputReader> idToReader;
private final LoadingCache<Serializable, Object> idToVariableCache;
/**
@@ -63,66 +59,34 @@ public final class BroadcastManagerWorker {
*/
@Inject
private BroadcastManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
- final PersistentConnectionToMasterMap toMaster) {
+ final PersistentConnectionToMasterMap toMaster) {
staticReference = this;
- this.idToReader = new ConcurrentHashMap<>();
this.idToVariableCache = CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(
new CacheLoader<Serializable, Object>() {
public Object load(final Serializable id) throws Exception {
- LOG.info("Start to load broadcast {}", id.toString());
- if (idToReader.containsKey(id)) {
- // Get from reader
- final InputReader inputReader = idToReader.get(id);
- final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> iterators = inputReader.read();
- if (iterators.size() != 1) {
- throw new IllegalStateException(id.toString());
- }
- final DataUtil.IteratorWithNumBytes iterator = iterators.get(0).get();
- if (!iterator.hasNext()) {
- throw new IllegalStateException(id.toString() + " (no element) " + iterator.toString());
- }
- final Object result = iterator.next();
- if (iterator.hasNext()) {
- throw new IllegalStateException(id.toString() + " (more than single element) " + iterator.toString());
- }
- return result;
- } else {
- // Get from master
- final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
- .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
- ControlMessage.Message.newBuilder()
- .setId(RuntimeIdManager.generateMessageId())
- .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
- .setType(ControlMessage.MessageType.RequestBroadcastVariable)
- .setRequestbroadcastVariableMsg(
- ControlMessage.RequestBroadcastVariableMessage.newBuilder()
- .setExecutorId(executorId)
- .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
- .build())
- .build());
- return SerializationUtils.deserialize(
- responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
- }
+ // Get from master
+ final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
+ .getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).request(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdManager.generateMessageId())
+ .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.RequestBroadcastVariable)
+ .setRequestbroadcastVariableMsg(
+ ControlMessage.RequestBroadcastVariableMessage.newBuilder()
+ .setExecutorId(executorId)
+ .setBroadcastId(ByteString.copyFrom(SerializationUtils.serialize(id)))
+ .build())
+ .build());
+ return SerializationUtils.deserialize(
+ responseFromMasterFuture.get().getBroadcastVariableMsg().getVariable().toByteArray());
}
});
}
/**
- * When the broadcast variable can be read by an input reader.
- * (i.e., the variable is expressed as an IREdge, and reside in a executor as a block)
- *
- * @param id of the broadcast variable.
- * @param inputReader the {@link InputReader} to register.
- */
- public void registerInputReader(final Serializable id,
- final InputReader inputReader) {
- this.idToReader.put(id, inputReader);
- }
-
- /**
* Get the variable with the id.
* @param id of the variable.
* @return the variable.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
index ed6073e..43d11cf 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -33,6 +33,7 @@ public final class DedicatedKeyPerElementPartitioner implements Partitioner<Inte
* Constructor.
*/
public DedicatedKeyPerElementPartitioner() {
+ // TODO #288: DedicatedKeyPerElementPartitioner should not always return 0
key = 0;
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 613eccc..0402fd4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -62,7 +62,6 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
@Override
public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
-
if (LOG.isDebugEnabled()) {
LOG.debug("Track watermark {} emitted from edge {}:, {}", watermark.getTimestamp(), edgeIndex,
watermarks.toString());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 12d9932..74c3ffb 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -105,7 +105,6 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
@Override
public void emitWatermark(final Watermark watermark) {
-
if (LOG.isDebugEnabled()) {
LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index dd70394..03d7470 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -78,7 +78,6 @@ public final class PipeOutputWriter implements OutputWriter {
private void writeData(final Object element, final List<ByteOutputContext> pipeList) {
pipeList.forEach(pipe -> {
-
try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream()) {
// Serialize (Do not compress)
final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
@@ -149,10 +148,14 @@ public final class PipeOutputWriter implements OutputWriter {
}
private List<ByteOutputContext> getPipeToWrite(final Object element) {
- return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
- .get()
- .equals(CommunicationPatternProperty.Value.OneToOne)
- ? Collections.singletonList(pipes.get(0))
- : Collections.singletonList(pipes.get((int) partitioner.partition(element)));
+ final CommunicationPatternProperty.Value comm =
+ (CommunicationPatternProperty.Value) runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).get();
+ if (comm.equals(CommunicationPatternProperty.Value.OneToOne)) {
+ return Collections.singletonList(pipes.get(0));
+ } else if (comm.equals(CommunicationPatternProperty.Value.BroadCast)) {
+ return pipes;
+ } else {
+ return Collections.singletonList(pipes.get((int) partitioner.partition(element)));
+ }
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index e8135f9..e3562ed 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -20,12 +20,15 @@ package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a special implementation for single input data stream for optimization.
*/
public final class SingleInputWatermarkManager implements InputWatermarkManager {
+ private static final Logger LOG = LoggerFactory.getLogger(SingleInputWatermarkManager.class.getName());
private final OutputCollector watermarkCollector;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
index 2ca3df8..215a6a8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/DataFetcher.java
@@ -31,6 +31,10 @@ abstract class DataFetcher implements AutoCloseable {
private final IRVertex dataSource;
private final OutputCollector outputCollector;
+ /**
+ * @param dataSource to fetch from.
+ * @param outputCollector for the data fetched.
+ */
DataFetcher(final IRVertex dataSource,
final OutputCollector outputCollector) {
this.dataSource = dataSource;
@@ -48,4 +52,8 @@ abstract class DataFetcher implements AutoCloseable {
OutputCollector getOutputCollector() {
return outputCollector;
}
+
+ IRVertex getDataSource() {
+ return dataSource;
+ }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index 001060c..7ce1ed9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -63,6 +63,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
// A watermark manager
private InputWatermarkManager inputWatermarkManager;
+
MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
final InputReader readerForParentTask,
final OutputCollector outputCollector) {
@@ -113,8 +114,6 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
// Consume this iterator to the end.
while (iterator.hasNext()) { // blocked on the iterator.
final Object element = iterator.next();
-
-
if (element instanceof WatermarkWithIndex) {
// watermark element
// the input watermark manager is accessed by multiple threads
@@ -177,17 +176,14 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
* It receives the watermark from InputWatermarkManager.
*/
private final class WatermarkCollector implements OutputCollector {
-
@Override
public void emit(final Object output) {
throw new IllegalStateException("Should not be called");
}
-
@Override
public void emitWatermark(final Watermark watermark) {
elementQueue.offer(watermark);
}
-
@Override
public void emit(final String dstVertexId, final Object output) {
throw new IllegalStateException("Should not be called");
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 80bbea2..d3c223f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -50,7 +50,8 @@ class ParentTaskDataFetcher extends DataFetcher {
private long serBytes = 0;
private long encodedBytes = 0;
- ParentTaskDataFetcher(final IRVertex dataSource, final InputReader readerForParentTask,
+ ParentTaskDataFetcher(final IRVertex dataSource,
+ final InputReader readerForParentTask,
final OutputCollector outputCollector) {
super(dataSource, outputCollector);
this.readersForParentTask = readerForParentTask;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 518bff3..b08b12a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -25,12 +25,11 @@ import org.apache.nemo.common.dag.Edge;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
import org.apache.nemo.common.ir.vertex.*;
import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -71,7 +70,7 @@ public final class TaskExecutor {
private boolean isExecuted;
private final String taskId;
private final TaskStateManager taskStateManager;
- private final List<DataFetcher> nonBroadcastDataFetchers;
+ private final List<DataFetcher> dataFetchers;
private final BroadcastManagerWorker broadcastManagerWorker;
private final List<VertexHarness> sortedHarnesses;
@@ -121,7 +120,7 @@ public final class TaskExecutor {
// Prepare data structures
final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
- this.nonBroadcastDataFetchers = pair.left();
+ this.dataFetchers = pair.left();
this.sortedHarnesses = pair.right();
}
@@ -188,7 +187,6 @@ public final class TaskExecutor {
// in {@link this#getInternalMainOutputs and this#internalMainOutputs}
final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = new HashMap<>();
reverseTopologicallySorted.forEach(childVertex -> {
-
if (childVertex instanceof OperatorVertex) {
final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
if (edges.size() == 1) {
@@ -201,11 +199,10 @@ public final class TaskExecutor {
new OperatorWatermarkCollector((OperatorVertex) childVertex)));
}
}
-
});
// Create a harness for each vertex
- final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
+ final List<DataFetcher> dataFetcherList = new ArrayList<>();
final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
reverseTopologicallySorted.forEach(irVertex -> {
@@ -250,38 +247,17 @@ public final class TaskExecutor {
// Source read
if (irVertex instanceof SourceVertex) {
// Source vertex read
- nonBroadcastDataFetcherList.add(new SourceVertexDataFetcher(
- (SourceVertex) irVertex, sourceReader.get(), outputCollector));
+ dataFetcherList.add(new SourceVertexDataFetcher(
+ (SourceVertex) irVertex,
+ sourceReader.get(),
+ outputCollector));
}
- // Parent-task read (broadcasts)
- final List<StageEdge> inEdgesForThisVertex = task.getTaskIncomingEdges()
- .stream()
- .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId()))
- .collect(Collectors.toList());
- final List<StageEdge> broadcastInEdges = inEdgesForThisVertex
- .stream()
- .filter(stageEdge -> stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
- .collect(Collectors.toList());
- final List<InputReader> broadcastReaders =
- getParentTaskReaders(taskIndex, broadcastInEdges, intermediateDataIOFactory);
- if (broadcastInEdges.size() != broadcastReaders.size()) {
- throw new IllegalStateException(broadcastInEdges.toString() + ", " + broadcastReaders.toString());
- }
- for (int i = 0; i < broadcastInEdges.size(); i++) {
- final StageEdge inEdge = broadcastInEdges.get(i);
- broadcastManagerWorker.registerInputReader(
- inEdge.getPropertyValue(BroadcastVariableIdProperty.class)
- .orElseThrow(() -> new IllegalStateException(inEdge.toString())),
- broadcastReaders.get(i));
- }
-
- // Parent-task read (non-broadcasts)
- final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
- nonBroadcastInEdges.removeAll(broadcastInEdges);
-
- nonBroadcastInEdges
+ // Parent-task read
+ // TODO #285: Cache broadcasted data
+ task.getTaskIncomingEdges()
.stream()
+ .filter(inEdge -> inEdge.getDstIRVertex().getId().equals(irVertex.getId())) // edge to this vertex
.map(incomingEdge ->
Pair.of(incomingEdge, intermediateDataIOFactory
.createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge)))
@@ -291,14 +267,21 @@ public final class TaskExecutor {
final int edgeIndex = edgeIndexMap.get(edge);
final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex);
final InputReader parentTaskReader = pair.right();
+ final OutputCollector dataFetcherOutputCollector =
+ new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager);
+
if (parentTaskReader instanceof PipeInputReader) {
- nonBroadcastDataFetcherList.add(
- new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
- new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+ dataFetcherList.add(
+ new MultiThreadParentTaskDataFetcher(
+ parentTaskReader.getSrcIrVertex(),
+ parentTaskReader,
+ dataFetcherOutputCollector));
} else {
- nonBroadcastDataFetcherList.add(
- new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
- new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+ dataFetcherList.add(
+ new ParentTaskDataFetcher(
+ parentTaskReader.getSrcIrVertex(),
+ parentTaskReader,
+ dataFetcherOutputCollector));
}
}
});
@@ -309,7 +292,7 @@ public final class TaskExecutor {
.map(vertex -> vertexIdToHarness.get(vertex.getId()))
.collect(Collectors.toList());
- return Pair.of(nonBroadcastDataFetcherList, sortedHarnessList);
+ return Pair.of(dataFetcherList, sortedHarnessList);
}
/**
@@ -319,7 +302,8 @@ public final class TaskExecutor {
outputCollector.emit(dataElement);
}
- private void processWatermark(final OutputCollector outputCollector, final Watermark watermark) {
+ private void processWatermark(final OutputCollector outputCollector,
+ final Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@@ -338,7 +322,7 @@ public final class TaskExecutor {
/**
* The task is executed in the following two phases.
- * - Phase 1: Consume task-external input data (non-broadcasts)
+ * - Phase 1: Consume task-external input data
* - Phase 2: Finalize task-internal states and data elements
*/
private void doExecute() {
@@ -349,8 +333,8 @@ public final class TaskExecutor {
LOG.info("{} started", taskId);
taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
- // Phase 1: Consume task-external input data. (non-broadcasts)
- if (!handleDataFetchers(nonBroadcastDataFetchers)) {
+ // Phase 1: Consume task-external input data.
+ if (!handleDataFetchers(dataFetchers)) {
return;
}
@@ -383,14 +367,14 @@ public final class TaskExecutor {
}
/**
- * Process an element generated from the dataFetcher.
- * If the element is an instance of Finishmark, we remove the dataFetcher from the current list.
- * @param element element
+ * Process an event generated from the dataFetcher.
+ * If the event is an instance of Finishmark, we remove the dataFetcher from the current list.
+ * @param event event
* @param dataFetcher current data fetcher
*/
- private void handleElement(final Object element,
- final DataFetcher dataFetcher) {
- if (element instanceof Finishmark) {
+ private void onEventFromDataFetcher(final Object event,
+ final DataFetcher dataFetcher) {
+ if (event instanceof Finishmark) {
// We've consumed all the data from this data fetcher.
if (dataFetcher instanceof SourceVertexDataFetcher) {
boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
@@ -401,12 +385,12 @@ public final class TaskExecutor {
serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
}
- } else if (element instanceof Watermark) {
+ } else if (event instanceof Watermark) {
// Watermark
- processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
+ processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
} else {
// Process data element
- processElement(dataFetcher.getOutputCollector(), element);
+ processElement(dataFetcher.getOutputCollector(), event);
}
}
@@ -457,7 +441,7 @@ public final class TaskExecutor {
final DataFetcher dataFetcher = availableIterator.next();
try {
final Object element = dataFetcher.fetchDataElement();
- handleElement(element, dataFetcher);
+ onEventFromDataFetcher(element, dataFetcher);
if (element instanceof Finishmark) {
availableIterator.remove();
}
@@ -485,7 +469,7 @@ public final class TaskExecutor {
final DataFetcher dataFetcher = pendingIterator.next();
try {
final Object element = dataFetcher.fetchDataElement();
- handleElement(element, dataFetcher);
+ onEventFromDataFetcher(element, dataFetcher);
// We processed data. This means the data fetcher is now available.
// Add current data fetcher to available
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 6cbf694..6b94ca7 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -24,6 +24,7 @@ import org.apache.nemo.common.punctuation.Finishmark;
import org.apache.nemo.runtime.executor.data.DataUtil;
import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader;
import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.InputWatermarkManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@@ -121,9 +122,9 @@ public final class ParentTaskDataFetcherTest {
private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTask) {
return new ParentTaskDataFetcher(
- mock(IRVertex.class),
- readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
- mock(OutputCollector.class));
+ mock(IRVertex.class),
+ readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
+ mock(OutputCollector.class));
}
private InputReader generateInputReader(final CompletableFuture completableFuture) {
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 6ae716a..919143b 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -26,7 +26,6 @@ import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
@@ -66,7 +65,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -422,16 +420,16 @@ public final class TaskExecutorTest {
.buildWithoutSourceSinkCheck();
final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+ final StageEdge taskInEdge = mockStageEdgeTo(operatorIRVertex1);
- final StageEdge broadcastInEdge = mockBroadcastVariableStageEdgeTo(
- new OperatorVertex(singleListTransform), operatorIRVertex2, broadcastId, elements);
+ when(broadcastManagerWorker.get(broadcastId)).thenReturn(new ArrayList<>(elements));
final Task task = new Task(
"testSourceVertexDataFetching",
generateTaskId(),
TASK_EXECUTION_PROPERTY_MAP,
new byte[0],
- Arrays.asList(mockStageEdgeTo(operatorIRVertex1), broadcastInEdge),
+ Collections.singletonList(taskInEdge),
Collections.singletonList(taskOutEdge),
Collections.emptyMap());
@@ -541,23 +539,6 @@ public final class TaskExecutorTest {
mock(Stage.class));
}
- private StageEdge mockBroadcastVariableStageEdgeTo(final IRVertex srcVertex,
- final IRVertex dstVertex,
- final Serializable broadcastVariableId,
- final Object broadcastVariable) {
- when(broadcastManagerWorker.get(broadcastVariableId)).thenReturn(broadcastVariable);
-
- final ExecutionPropertyMap executionPropertyMap =
- ExecutionPropertyMap.of(mock(IREdge.class), CommunicationPatternProperty.Value.OneToOne);
- executionPropertyMap.put(BroadcastVariableIdProperty.of(broadcastVariableId));
- return new StageEdge("runtime outgoing edge id",
- executionPropertyMap,
- srcVertex,
- dstVertex,
- mock(Stage.class),
- mock(Stage.class));
- }
-
/**
* Represents the answer return an inter-stage {@link InputReader},
* which will have multiple iterable according to the source parallelism.