You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/10/11 01:37:11 UTC
[incubator-nemo] branch master updated: [NEMO-213] Use Beam's
DoFnRunners to execute DoFn (#122)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 18b61c1 [NEMO-213] Use Beam's DoFnRunners to execute DoFn (#122)
18b61c1 is described below
commit 18b61c1e862e3af51c87f19c0182725cdee8e2a0
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Thu Oct 11 10:37:06 2018 +0900
[NEMO-213] Use Beam's DoFnRunners to execute DoFn (#122)
JIRA: [NEMO-213: Use Beam's DoFnRunners to execute DoFn](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-213)
**Major changes:**
- use `simpleDoFnRunner` to process events of `DoFn`
- convert all beam `Transform`s to process `WindowedValue`
- create a `BeamBoundedWindowedSourceVertex` that generates `WindowedValue`s
- modify `PipelineTranslator` to use the newly created `Transform`s (`SimpleDoFnTransform`...)
**Minor changes to note:**
-
**Tests for the changes:**
- create a unit test that executes `SimpleDoFnTransform`
**Other comments:**
- We need a performance test
Closes #122
---
.../compiler/frontend/beam/BeamKeyExtractor.java | 7 +-
.../compiler/frontend/beam/NemoPipelineRunner.java | 5 +-
.../compiler/frontend/beam/PipelineTranslator.java | 128 ++++--
.../beam/source/BeamBoundedSourceVertex.java | 33 +-
.../BroadcastVariableSideInputReader.java | 60 +++
.../beam/transform/CreateViewTransform.java | 16 +-
.../beam/transform/DefaultOutputManager.java | 51 +++
.../frontend/beam/transform/DoFnTransform.java | 157 +++++++
.../frontend/beam/transform/DoTransform.java | 449 ---------------------
.../beam/transform/GroupByKeyTransform.java | 15 +-
.../frontend/beam/transform/WindowFnTransform.java | 98 +++++
.../frontend/beam/transform/WindowTransform.java | 61 ---
.../nemo/compiler/optimizer/PairKeyExtractor.java} | 17 +-
.../compiletime/reshaping/SkewReshapingPass.java | 3 +-
.../compiler/backend/nemo/DAGConverterTest.java | 5 +-
.../frontend/beam/transform/DoFnTransformTest.java | 287 +++++++++++++
.../beam/MultinomialLogisticRegression.java | 6 +-
.../runtime/master/scheduler/TaskRetryTest.java | 4 +-
18 files changed, 816 insertions(+), 586 deletions(-)
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index c3f4bbc..b64cb60 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -15,6 +15,7 @@
*/
package org.apache.nemo.compiler.frontend.beam;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.KeyExtractor;
import org.apache.beam.sdk.values.KV;
@@ -25,9 +26,11 @@ import org.apache.beam.sdk.values.KV;
final class BeamKeyExtractor implements KeyExtractor {
@Override
public Object extractKey(final Object element) {
- if (element instanceof KV) {
+ final WindowedValue windowedValue = (WindowedValue) element;
+ final Object value = windowedValue.getValue();
+ if (value instanceof KV) {
// Handle null keys, since Beam allows KV with null keys.
- final Object key = ((KV) element).getKey();
+ final Object key = ((KV) value).getKey();
return key == null ? 0 : key;
} else {
return element;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
index e3043bd..deb4dfe 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/NemoPipelineRunner.java
@@ -56,8 +56,9 @@ public final class NemoPipelineRunner extends PipelineRunner<NemoPipelineResult>
public NemoPipelineResult run(final Pipeline pipeline) {
final PipelineVisitor pipelineVisitor = new PipelineVisitor();
pipeline.traverseTopologically(pipelineVisitor);
- final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipelineVisitor.getConvertedPipeline(),
- nemoPipelineOptions);
+ final DAG<IRVertex, IREdge> dag = PipelineTranslator.translate(pipeline,
+ pipelineVisitor.getConvertedPipeline(),
+ nemoPipelineOptions);
final NemoPipelineResult nemoPipelineResult = new NemoPipelineResult();
JobLauncher.launchDAG(dag);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 6ef96bf..2486a00 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -15,6 +15,12 @@
*/
package org.apache.nemo.compiler.frontend.beam;
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
+import org.apache.beam.runners.core.construction.TransformInputs;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.edge.IREdge;
@@ -38,6 +44,7 @@ import org.apache.beam.sdk.values.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.annotation.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -45,17 +52,19 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
- * Converts DAG of Beam pipeline to Nemo IR DAG.
+ * Converts DAG of Beam root to Nemo IR DAG.
* For a {@link PrimitiveTransformVertex}, it defines mapping to the corresponding {@link IRVertex}.
* For a {@link CompositeTransformVertex}, it defines how to setup and clear {@link TranslationContext}
* before start translating inner Beam transform hierarchy.
*/
public final class PipelineTranslator
- implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
+ implements BiFunction<CompositeTransformVertex, PipelineOptions, DAG<IRVertex, IREdge>> {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslator.class.getName());
@@ -64,15 +73,21 @@ public final class PipelineTranslator
private final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator = new HashMap<>();
private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator = new HashMap<>();
+ // TODO #220: Move this variable to TranslationContext
+ private static final AtomicReference<Pipeline> PIPELINE = new AtomicReference<>();
+
/**
* Static translator method.
- * @param pipeline Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
+ * @param pipeline the original root
+ * @param root Top-level Beam transform hierarchy, usually given by {@link PipelineVisitor}
* @param pipelineOptions {@link PipelineOptions}
* @return Nemo IR DAG
*/
- public static DAG<IRVertex, IREdge> translate(final CompositeTransformVertex pipeline,
+ public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
+ final CompositeTransformVertex root,
final PipelineOptions pipelineOptions) {
- return INSTANCE.apply(pipeline, pipelineOptions);
+ PIPELINE.set(pipeline);
+ return INSTANCE.apply(root, pipelineOptions);
}
/**
@@ -113,38 +128,74 @@ public final class PipelineTranslator
transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
}
+ private static DoFnTransform createDoFnTransform(final TranslationContext ctx,
+ final PrimitiveTransformVertex transformVertex) {
+ try {
+ final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+ final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+ final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
+ final List<PCollectionView<?>> sideInputs = ParDoTranslation.getSideInputs(pTransform);
+ final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
+
+ final PCollection<?> mainInput = (PCollection<?>)
+ Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+
+ return new DoFnTransform(
+ doFn,
+ mainInput.getCoder(),
+ getOutputCoders(pTransform),
+ mainOutputTag,
+ additionalOutputTags.getAll(),
+ mainInput.getWindowingStrategy(),
+ sideInputs,
+ ctx.pipelineOptions);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@PrimitiveTransformTranslator(ParDo.SingleOutput.class)
private static void parDoSingleOutputTranslator(final TranslationContext ctx,
final PrimitiveTransformVertex transformVertex,
final ParDo.SingleOutput<?, ?> transform) {
- final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
- final IRVertex vertex = new OperatorVertex(doTransform);
+ final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+ final IRVertex vertex = new OperatorVertex(doFnTransform);
+
ctx.addVertex(vertex);
transformVertex.getNode().getInputs().values().stream()
- .filter(input -> !transform.getAdditionalInputs().values().contains(input))
- .forEach(input -> ctx.addEdgeTo(vertex, input));
+ .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+ .forEach(input -> ctx.addEdgeTo(vertex, input));
transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
}
+ private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final AppliedPTransform<?, ?, ?> ptransform) {
+ return ptransform
+ .getOutputs()
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() instanceof PCollection)
+ .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) e.getValue()).getCoder()));
+ }
+
@PrimitiveTransformTranslator(ParDo.MultiOutput.class)
private static void parDoMultiOutputTranslator(final TranslationContext ctx,
final PrimitiveTransformVertex transformVertex,
final ParDo.MultiOutput<?, ?> transform) {
- final DoTransform doTransform = new DoTransform(transform.getFn(), ctx.pipelineOptions);
- final IRVertex vertex = new OperatorVertex(doTransform);
+ final DoFnTransform doFnTransform = createDoFnTransform(ctx, transformVertex);
+ final IRVertex vertex = new OperatorVertex(doFnTransform);
ctx.addVertex(vertex);
transformVertex.getNode().getInputs().values().stream()
- .filter(input -> !transform.getAdditionalInputs().values().contains(input))
- .forEach(input -> ctx.addEdgeTo(vertex, input));
+ .filter(input -> !transform.getAdditionalInputs().values().contains(input))
+ .forEach(input -> ctx.addEdgeTo(vertex, input));
transform.getSideInputs().forEach(input -> ctx.addEdgeTo(vertex, input));
transformVertex.getNode().getOutputs().entrySet().stream()
- .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
- .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
+ .filter(pValueWithTupleTag -> pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+ .forEach(pValueWithTupleTag -> ctx.registerMainOutputFrom(vertex, pValueWithTupleTag.getValue()));
transformVertex.getNode().getOutputs().entrySet().stream()
- .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
- .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
- pValueWithTupleTag.getKey()));
+ .filter(pValueWithTupleTag -> !pValueWithTupleTag.getKey().equals(transform.getMainOutputTag()))
+ .forEach(pValueWithTupleTag -> ctx.registerAdditionalOutputFrom(vertex, pValueWithTupleTag.getValue(),
+ pValueWithTupleTag.getKey()));
}
@PrimitiveTransformTranslator(GroupByKey.class)
@@ -169,7 +220,7 @@ public final class PipelineTranslator
} else {
throw new UnsupportedOperationException(String.format("%s is not supported", transform));
}
- final IRVertex vertex = new OperatorVertex(new WindowTransform(windowFn));
+ final IRVertex vertex = new OperatorVertex(new WindowFnTransform(windowFn));
ctx.addVertex(vertex);
transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, input));
transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex, output));
@@ -225,7 +276,7 @@ public final class PipelineTranslator
final boolean handlesBeamRow = Stream
.concat(transformVertex.getNode().getInputs().values().stream(),
transformVertex.getNode().getOutputs().values().stream())
- .map(pValue -> (KvCoder) getCoder(pValue, ctx.pipeline)) // Input and output of combine should be KV
+ .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output of combine should be KV
.map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // We're interested in the 'Value' of KV
.anyMatch(valueTypeDescriptor -> TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
if (handlesBeamRow) {
@@ -280,7 +331,8 @@ public final class PipelineTranslator
}
@Override
- public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline, final PipelineOptions pipelineOptions) {
+ public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
+ final PipelineOptions pipelineOptions) {
final TranslationContext ctx = new TranslationContext(pipeline, primitiveTransformToTranslator,
compositeTransformToTranslator, DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
ctx.translate(pipeline);
@@ -351,7 +403,7 @@ public final class PipelineTranslator
* Translation context.
*/
private static final class TranslationContext {
- private final CompositeTransformVertex pipeline;
+ private final CompositeTransformVertex root;
private final PipelineOptions pipelineOptions;
private final DAGBuilder<IRVertex, IREdge> builder;
private final Map<PValue, IRVertex> pValueToProducer;
@@ -363,18 +415,18 @@ public final class PipelineTranslator
private final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator;
/**
- * @param pipeline the pipeline to translate
+ * @param root the root to translate
* @param primitiveTransformToTranslator provides translators for PrimitiveTransform
* @param compositeTransformToTranslator provides translators for CompositeTransform
* @param selector provides {@link CommunicationPatternProperty.Value} for IR edges
* @param pipelineOptions {@link PipelineOptions}
*/
- private TranslationContext(final CompositeTransformVertex pipeline,
+ private TranslationContext(final CompositeTransformVertex root,
final Map<Class<? extends PTransform>, Method> primitiveTransformToTranslator,
final Map<Class<? extends PTransform>, Method> compositeTransformToTranslator,
final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector,
final PipelineOptions pipelineOptions) {
- this.pipeline = pipeline;
+ this.root = root;
this.builder = new DAGBuilder<>();
this.pValueToProducer = new HashMap<>();
this.pValueToTag = new HashMap<>();
@@ -393,7 +445,7 @@ public final class PipelineTranslator
*/
private TranslationContext(final TranslationContext ctx,
final BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> selector) {
- this.pipeline = ctx.pipeline;
+ this.root = ctx.root;
this.pipelineOptions = ctx.pipelineOptions;
this.builder = ctx.builder;
this.pValueToProducer = ctx.pValueToProducer;
@@ -465,7 +517,7 @@ public final class PipelineTranslator
if (src == null) {
try {
throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
- + "while PTransform %s is known to produce it.", input, pipeline.getPrimitiveProducerOf(input)));
+ + "while PTransform %s is known to produce it.", input, root.getPrimitiveProducerOf(input)));
} catch (final RuntimeException e) {
throw new RuntimeException(String.format("Cannot find a vertex that emits pValue %s, "
+ "and the corresponding PTransform was not found", input));
@@ -477,17 +529,18 @@ public final class PipelineTranslator
+ "for an edge from %s to %s", communicationPatternSelector, src, dst));
}
final IREdge edge = new IREdge(communicationPattern, src, dst);
- final Coder<?> coder;
+ final Coder coder;
+ final Coder windowCoder;
if (input instanceof PCollection) {
coder = ((PCollection) input).getCoder();
+ windowCoder = ((PCollection) input).getWindowingStrategy().getWindowFn().windowCoder();
} else if (input instanceof PCollectionView) {
- coder = getCoderForView((PCollectionView) input, pipeline);
+ coder = getCoderForView((PCollectionView) input, root);
+ windowCoder = ((PCollectionView) input).getPCollection()
+ .getWindowingStrategy().getWindowFn().windowCoder();
} else {
- coder = null;
- }
- if (coder == null) {
throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
- + "be determined", src, dst, input));
+ + "be determined", src, dst, input));
}
edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
@@ -497,8 +550,11 @@ public final class PipelineTranslator
edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
}
- edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
- edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
+
+ edge.setProperty(EncoderProperty.of(
+ new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
+ edge.setProperty(DecoderProperty.of(
+ new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder, windowCoder))));
if (pValueToTag.containsKey(input)) {
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
@@ -553,7 +609,7 @@ public final class PipelineTranslator
final Transform srcTransform = src instanceof OperatorVertex ? ((OperatorVertex) src).getTransform() : null;
final Transform dstTransform = dst instanceof OperatorVertex ? ((OperatorVertex) dst).getTransform() : null;
- final DoFn srcDoFn = srcTransform instanceof DoTransform ? ((DoTransform) srcTransform).getDoFn() : null;
+ final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? ((DoFnTransform) srcTransform).getDoFn() : null;
if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
return CommunicationPatternProperty.Value.Shuffle;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index f17ab98..09c9076 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -16,6 +16,7 @@
package org.apache.nemo.compiler.frontend.beam.source;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.Readable;
import java.io.IOException;
@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
* SourceVertex implementation for BoundedSource.
* @param <O> output type.
*/
-public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
+public final class BeamBoundedSourceVertex<O> extends SourceVertex<WindowedValue<O>> {
private static final Logger LOG = LoggerFactory.getLogger(BeamBoundedSourceVertex.class.getName());
private BoundedSource<O> source;
private final String sourceDescription;
@@ -68,8 +69,8 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
}
@Override
- public List<Readable<O>> getReadables(final int desiredNumOfSplits) throws Exception {
- final List<Readable<O>> readables = new ArrayList<>();
+ public List<Readable<WindowedValue<O>>> getReadables(final int desiredNumOfSplits) throws Exception {
+ final List<Readable<WindowedValue<O>>> readables = new ArrayList<>();
LOG.info("estimate: {}", source.getEstimatedSizeBytes(null));
LOG.info("desired: {}", desiredNumOfSplits);
source.split(source.getEstimatedSizeBytes(null) / desiredNumOfSplits, null)
@@ -93,7 +94,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
* BoundedSourceReadable class.
* @param <T> type.
*/
- private static final class BoundedSourceReadable<T> implements Readable<T> {
+ private static final class BoundedSourceReadable<T> implements Readable<WindowedValue<T>> {
private final BoundedSource<T> boundedSource;
/**
@@ -105,11 +106,29 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
}
@Override
- public Iterable<T> read() throws IOException {
- final ArrayList<T> elements = new ArrayList<>();
+ public Iterable<WindowedValue<T>> read() throws IOException {
+ boolean started = false;
+ boolean windowed = false;
+
+ final ArrayList<WindowedValue<T>> elements = new ArrayList<>();
try (BoundedSource.BoundedReader<T> reader = boundedSource.createReader(null)) {
for (boolean available = reader.start(); available; available = reader.advance()) {
- elements.add(reader.getCurrent());
+ final T elem = reader.getCurrent();
+
+ // Check whether the element is windowed or not
+ // We only have to check the first element.
+ if (!started) {
+ started = true;
+ if (elem instanceof WindowedValue) {
+ windowed = true;
+ }
+ }
+
+ if (!windowed) {
+ elements.add(WindowedValue.valueInGlobalWindow(reader.getCurrent()));
+ } else {
+ elements.add((WindowedValue<T>) elem);
+ }
}
}
return elements;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
new file mode 100644
index 0000000..4725fa9
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/BroadcastVariableSideInputReader.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+
+/**
+ * A sideinput reader that reads/writes side input values to context.
+ */
+public final class BroadcastVariableSideInputReader implements SideInputReader {
+
+ // Nemo context for storing/getting side inputs
+ private final Transform.Context context;
+
+ // The list of side inputs that we're handling
+ private final Collection<PCollectionView<?>> sideInputs;
+
+ BroadcastVariableSideInputReader(final Transform.Context context,
+ final Collection<PCollectionView<?>> sideInputs) {
+ this.context = context;
+ this.sideInputs = sideInputs;
+ }
+
+ @Nullable
+ @Override
+ public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+ // TODO #216: implement side input and windowing
+ return ((WindowedValue<T>) context.getBroadcastVariable(view)).getValue();
+ }
+
+ @Override
+ public <T> boolean contains(final PCollectionView<T> view) {
+ return sideInputs.contains(view);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return sideInputs.isEmpty();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index cd93296..91fb3e2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -15,6 +15,7 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.transforms.Materializations;
@@ -31,9 +32,9 @@ import java.util.ArrayList;
* @param <I> input type.
* @param <O> output type.
*/
-public final class CreateViewTransform<I, O> implements Transform<I, O> {
+public final class CreateViewTransform<I, O> implements Transform<WindowedValue<I>, WindowedValue<O>> {
private final PCollectionView pCollectionView;
- private OutputCollector<O> outputCollector;
+ private OutputCollector<WindowedValue<O>> outputCollector;
private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
private final MultiView<Object> multiView;
@@ -48,21 +49,22 @@ public final class CreateViewTransform<I, O> implements Transform<I, O> {
}
@Override
- public void prepare(final Context context, final OutputCollector<O> oc) {
+ public void prepare(final Context context, final OutputCollector<WindowedValue<O>> oc) {
this.outputCollector = oc;
}
@Override
- public void onData(final I element) {
- // Since CreateViewTransform takes KV(Void, value), this is okay
- final KV<?, ?> kv = (KV<?, ?>) element; // It will throw a type cast exception if the element is not KV
+ public void onData(final WindowedValue<I> element) {
+ // TODO #216: support window in view
+ final KV kv = ((WindowedValue<KV>) element).getValue();
multiView.getDataList().add(kv.getValue());
}
@Override
public void close() {
final Object view = viewFn.apply(multiView);
- outputCollector.emit((O) view);
+ // TODO #216: support window in view
+ outputCollector.emit(WindowedValue.valueInGlobalWindow((O) view));
}
@Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
new file mode 100644
index 0000000..4174c6c
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DefaultOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+
+import java.util.Map;
+
+/**
+ * Default output emitter that uses outputCollector.
+ * @param <OutputT> output type
+ */
+public final class DefaultOutputManager<OutputT> implements DoFnRunners.OutputManager {
+ private final TupleTag<OutputT> mainOutputTag;
+ private final OutputCollector<WindowedValue<OutputT>> outputCollector;
+ private final Map<String, String> additionalOutputs;
+
+ DefaultOutputManager(final OutputCollector<WindowedValue<OutputT>> outputCollector,
+ final Transform.Context context,
+ final TupleTag<OutputT> mainOutputTag) {
+ this.outputCollector = outputCollector;
+ this.mainOutputTag = mainOutputTag;
+ this.additionalOutputs = context.getTagToAdditionalChildren();
+ }
+
+ @Override
+ public <T> void output(final TupleTag<T> tag, final WindowedValue<T> output) {
+ if (tag.equals(mainOutputTag)) {
+ outputCollector.emit((WindowedValue<OutputT>) output);
+ } else {
+ outputCollector.emit(additionalOutputs.get(tag.getId()), output);
+ }
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
new file mode 100644
index 0000000..8dbf051
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * DoFn transform implementation.
+ *
+ * @param <InputT> input type.
+ * @param <OutputT> output type.
+ */
+public final class DoFnTransform<InputT, OutputT> implements
+ Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+
+ private OutputCollector<WindowedValue<OutputT>> outputCollector;
+ private final TupleTag<OutputT> mainOutputTag;
+ private final List<TupleTag<?>> additionalOutputTags;
+ private final Collection<PCollectionView<?>> sideInputs;
+ private final WindowingStrategy<?, ?> windowingStrategy;
+ private final DoFn<InputT, OutputT> doFn;
+ private final SerializablePipelineOptions serializedOptions;
+ private transient DoFnRunner<InputT, OutputT> doFnRunner;
+ private transient SideInputReader sideInputReader;
+ private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+ private final Coder<InputT> inputCoder;
+ private final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+ /**
+ * DoFnTransform Constructor.
+ *
+ * @param doFn doFn.
+ * @param options Pipeline options.
+ */
+ public DoFnTransform(final DoFn<InputT, OutputT> doFn,
+ final Coder<InputT> inputCoder,
+ final Map<TupleTag<?>, Coder<?>> outputCoders,
+ final TupleTag<OutputT> mainOutputTag,
+ final List<TupleTag<?>> additionalOutputTags,
+ final WindowingStrategy<?, ?> windowingStrategy,
+ final Collection<PCollectionView<?>> sideInputs,
+ final PipelineOptions options) {
+ this.doFn = doFn;
+ this.inputCoder = inputCoder;
+ this.outputCoders = outputCoders;
+ this.mainOutputTag = mainOutputTag;
+ this.additionalOutputTags = additionalOutputTags;
+ this.sideInputs = sideInputs;
+ this.serializedOptions = new SerializablePipelineOptions(options);
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
+ // deserialize pipeline option
+ final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
+
+ this.outputCollector = oc;
+
+ // create output manager
+ final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
+ outputCollector, context, mainOutputTag);
+
+ // create side input reader
+ if (!sideInputs.isEmpty()) {
+ sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
+ } else {
+ sideInputReader = NullSideInputReader.of(sideInputs);
+ }
+
+ // create step context
+ // this transform does not support state and timer.
+ final StepContext stepContext = new StepContext() {
+ @Override
+ public StateInternals stateInternals() {
+ throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
+ }
+ };
+
+ // invoker
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+
+ // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
+ // and that this approach is the standard used by most of the Beam runners
+ doFnRunner = DoFnRunners.simpleRunner(
+ options,
+ doFn,
+ sideInputReader,
+ outputManager,
+ mainOutputTag,
+ additionalOutputTags,
+ stepContext,
+ inputCoder,
+ outputCoders,
+ windowingStrategy);
+
+ doFnRunner.startBundle();
+ }
+
+ @Override
+ public void onData(final WindowedValue<InputT> data) {
+ doFnRunner.processElement(data);
+ }
+
+ public DoFn getDoFn() {
+ return doFn;
+ }
+
+ @Override
+ public void close() {
+ doFnRunner.finishBundle();
+ doFnInvoker.invokeTeardown();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("DoTransform:" + doFn);
+ return sb.toString();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
deleted file mode 100644
index 3a36ec0..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ /dev/null
@@ -1,449 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.state.State;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.Timer;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * DoFn transform implementation.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
-public final class DoTransform<I, O> implements Transform<I, O> {
- private final DoFn doFn;
- private final ObjectMapper mapper;
- private final String serializedOptions;
- private OutputCollector<O> outputCollector;
- private StartBundleContext startBundleContext;
- private FinishBundleContext finishBundleContext;
- private ProcessContext processContext;
- private DoFnInvoker invoker;
-
- /**
- * DoTransform Constructor.
- *
- * @param doFn doFn.
- * @param options Pipeline options.
- */
- public DoTransform(final DoFn doFn, final PipelineOptions options) {
- this.doFn = doFn;
- this.mapper = new ObjectMapper();
- try {
- this.serializedOptions = mapper.writeValueAsString(options);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void prepare(final Context context, final OutputCollector<O> oc) {
- this.outputCollector = oc;
- this.startBundleContext = new StartBundleContext(doFn, serializedOptions);
- this.finishBundleContext = new FinishBundleContext(doFn, outputCollector, serializedOptions);
- this.processContext = new ProcessContext(doFn, outputCollector, context, serializedOptions);
- this.invoker = DoFnInvokers.invokerFor(doFn);
- invoker.invokeSetup();
- invoker.invokeStartBundle(startBundleContext);
- }
-
- @Override
- public void onData(final I data) {
- processContext.setElement(data);
- invoker.invokeProcessElement(processContext);
- }
-
- @Override
- public void close() {
- invoker.invokeFinishBundle(finishBundleContext);
- invoker.invokeTeardown();
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("DoTransform:" + doFn);
- return sb.toString();
- }
-
- /**
- * StartBundleContext.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
- private static final class StartBundleContext<I, O> extends DoFn<I, O>.StartBundleContext {
- private final ObjectMapper mapper;
- private final PipelineOptions options;
-
- /**
- * StartBundleContext.
- *
- * @param fn DoFn.
- * @param serializedOptions serialized options of the DoTransform.
- */
- StartBundleContext(final DoFn<I, O> fn,
- final String serializedOptions) {
- fn.super();
- this.mapper = new ObjectMapper();
- try {
- this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
- }
-
- /**
- * FinishBundleContext.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
- private static final class FinishBundleContext<I, O> extends DoFn<I, O>.FinishBundleContext {
- private final OutputCollector<O> outputCollector;
- private final ObjectMapper mapper;
- private final PipelineOptions options;
-
- /**
- * Constructor.
- *
- * @param fn DoFn.
- * @param outputCollector outputCollector of the DoTransform.
- * @param serializedOptions serialized options of the DoTransform.
- */
- FinishBundleContext(final DoFn<I, O> fn,
- final OutputCollector<O> outputCollector,
- final String serializedOptions) {
- fn.super();
- this.outputCollector = outputCollector;
- this.mapper = new ObjectMapper();
- try {
- this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public void output(final O output, final Instant instant, final BoundedWindow boundedWindow) {
- outputCollector.emit(output);
- }
-
- @Override
- public <T> void output(final TupleTag<T> tupleTag,
- final T t,
- final Instant instant,
- final BoundedWindow boundedWindow) {
- throw new UnsupportedOperationException("output(TupleTag, T, Instant, BoundedWindow)"
- + "in FinishBundleContext under DoTransform");
- }
- }
-
- /**
- * ProcessContext class. Reference: SimpleDoFnRunner.DoFnProcessContext in BEAM.
- *
- * @param <I> input type.
- * @param <O> output type.
- */
- private static final class ProcessContext<I, O> extends DoFn<I, O>.ProcessContext
- implements DoFnInvoker.ArgumentProvider<I, O> {
- private I input;
- private final OutputCollector<O> outputCollector;
- private final Map<String, String> additionalOutputs;
- private final Context context;
- private final ObjectMapper mapper;
- private final PipelineOptions options;
-
- /**
- * ProcessContext Constructor.
- *
- * @param fn Dofn.
- * @param outputCollector OutputCollector.
- * @param context Context.
- * @param serializedOptions Options, serialized.
- */
- ProcessContext(final DoFn<I, O> fn,
- final OutputCollector<O> outputCollector,
- final Context context,
- final String serializedOptions) {
- fn.super();
- this.outputCollector = outputCollector;
- this.context = context;
- this.additionalOutputs = context.getTagToAdditionalChildren();
- this.mapper = new ObjectMapper();
- try {
- this.options = mapper.readValue(serializedOptions, PipelineOptions.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Setter for input element.
- *
- * @param in input element.
- */
- void setElement(final I in) {
- this.input = in;
- }
-
- @Override
- public I element() {
- return this.input;
- }
-
- @Override
- public Row asRow(final String id) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> T sideInput(final PCollectionView<T> view) {
- return (T) context.getBroadcastVariable(view);
- }
-
- @Override
- public Instant timestamp() {
- throw new UnsupportedOperationException("timestamp() in ProcessContext under DoTransform");
- }
-
- @Override
- public PaneInfo pane() {
- return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
- }
-
- @Override
- public void updateWatermark(final Instant instant) {
- throw new UnsupportedOperationException("updateWatermark() in ProcessContext under DoTransform");
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return this.options;
- }
-
- @Override
- public void output(final O output) {
- outputCollector.emit(output);
- }
-
- @Override
- public void outputWithTimestamp(final O output, final Instant timestamp) {
- outputCollector.emit(output);
- }
-
- @Override
- public <T> void output(final TupleTag<T> tupleTag, final T t) {
- final Object dstVertexId = additionalOutputs.get(tupleTag.getId());
-
- if (dstVertexId == null) {
- outputCollector.emit((O) t);
- } else {
- outputCollector.emit(additionalOutputs.get(tupleTag.getId()), t);
- }
- }
-
- @Override
- public <T> void outputWithTimestamp(final TupleTag<T> tupleTag, final T t, final Instant instant) {
- throw new UnsupportedOperationException("output(TupleTag, T, Instant) in ProcessContext under DoTransform");
- }
-
- @Override
- public BoundedWindow window() {
- // Unbounded windows are not supported for now.
- return GlobalWindow.INSTANCE;
- }
-
- @Override
- public PaneInfo paneInfo(final DoFn<I, O> doFn) {
- return PaneInfo.createPane(true, true, PaneInfo.Timing.UNKNOWN);
- }
-
- @Override
- public PipelineOptions pipelineOptions() {
- return options;
- }
-
- @Override
- public DoFn<I, O>.StartBundleContext startBundleContext(final DoFn<I, O> doFn) {
- throw new UnsupportedOperationException("StartBundleContext parameters are not supported.");
- }
-
- @Override
- public DoFn<I, O>.FinishBundleContext finishBundleContext(final DoFn<I, O> doFn) {
- throw new UnsupportedOperationException("FinishBundleContext parameters are not supported.");
- }
-
- @Override
- public DoFn.ProcessContext
- processContext(final DoFn<I, O> doFn) {
- return this;
- }
-
- @Override
- public DoFn.OnTimerContext
- onTimerContext(final DoFn<I, O> doFn) {
- throw new UnsupportedOperationException("onTimerContext() in ProcessContext under DoTransform");
- }
-
- @Override
- public I element(final DoFn<I, O> doFn) {
- return this.input;
- }
-
- @Override
- public Instant timestamp(final DoFn<I, O> doFn) {
- return Instant.now();
- }
-
- @Override
- public RestrictionTracker<?, ?> restrictionTracker() {
- throw new UnsupportedOperationException("restrictionTracker() in ProcessContext under DoTransform");
- }
-
- @Override
- public TimeDomain timeDomain(final DoFn<I, O> doFn) {
- throw new UnsupportedOperationException("timeDomain() in ProcessContext under DoTransform");
- }
-
- @Override
- public DoFn.OutputReceiver<O> outputReceiver(final DoFn<I, O> doFn) {
- return new OutputReceiver<>((OutputCollectorImpl) outputCollector);
- }
-
- @Override
- public DoFn.OutputReceiver<Row> outputRowReceiver(final DoFn<I, O> doFn) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public DoFn.MultiOutputReceiver taggedOutputReceiver(final DoFn<I, O> doFn) {
- return new MultiOutputReceiver((OutputCollectorImpl) outputCollector, additionalOutputs);
- }
-
- @Override
- public State state(final String stateId) {
- throw new UnsupportedOperationException("state() in ProcessContext under DoTransform");
- }
-
- @Override
- public Timer timer(final String timerId) {
- throw new UnsupportedOperationException("timer() in ProcessContext under DoTransform");
- }
- }
-
- /**
- * @return {@link DoFn} for this transform.
- */
- public DoFn getDoFn() {
- return doFn;
- }
-
- /**
- * OutputReceiver class.
- * @param <O> output type
- */
- static final class OutputReceiver<O> implements DoFn.OutputReceiver<O> {
- private final List<O> dataElements;
-
- OutputReceiver(final OutputCollectorImpl<O> outputCollector) {
- this.dataElements = outputCollector.getMainTagOutputQueue();
- }
-
- OutputReceiver(final OutputCollectorImpl outputCollector,
- final TupleTag<O> tupleTag,
- final Map<String, String> tagToVertex) {
- final Object dstVertexId = tagToVertex.get(tupleTag.getId());
- if (dstVertexId == null) {
- this.dataElements = outputCollector.getMainTagOutputQueue();
- } else {
- this.dataElements = (List<O>) outputCollector.getAdditionalTagOutputQueue((String) dstVertexId);
- }
- }
-
- @Override
- public void output(final O output) {
- dataElements.add(output);
- }
-
- @Override
- public void outputWithTimestamp(final O output, final Instant timestamp) {
- dataElements.add(output);
- }
- }
-
- /**
- * MultiOutputReceiver class.
- */
- static final class MultiOutputReceiver implements DoFn.MultiOutputReceiver {
- private final OutputCollectorImpl outputCollector;
- private final Map<String, String> tagToVertex;
-
- /**
- * Constructor.
- * @param outputCollector outputCollector
- * @param tagToVertex tag to vertex map
- */
- MultiOutputReceiver(final OutputCollectorImpl outputCollector,
- final Map<String, String> tagToVertex) {
- this.outputCollector = outputCollector;
- this.tagToVertex = tagToVertex;
- }
-
- @Override
- public <T> DoFn.OutputReceiver<T> get(final TupleTag<T> tag) {
- return new OutputReceiver<>(this.outputCollector, tag, tagToVertex);
- }
-
- @Override
- public <T> OutputReceiver<Row> getRowReceiver(final TupleTag<T> tag) {
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index b0e6429..38b2641 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -15,6 +15,7 @@
*/
package org.apache.nemo.compiler.frontend.beam.transform;
+import org.apache.beam.sdk.util.WindowedValue;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.beam.sdk.values.KV;
@@ -27,10 +28,10 @@ import java.util.*;
* Group Beam KVs.
* @param <I> input type.
*/
-public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, List>> {
+public final class GroupByKeyTransform<I> implements Transform<I, WindowedValue<KV<Object, List>>> {
private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyTransform.class.getName());
private final Map<Object, List> keyToValues;
- private OutputCollector<KV<Object, List>> outputCollector;
+ private OutputCollector<WindowedValue<KV<Object, List>>> outputCollector;
/**
* GroupByKey constructor.
@@ -40,23 +41,27 @@ public final class GroupByKeyTransform<I> implements Transform<I, KV<Object, Lis
}
@Override
- public void prepare(final Context context, final OutputCollector<KV<Object, List>> oc) {
+ public void prepare(final Context context, final OutputCollector<WindowedValue<KV<Object, List>>> oc) {
this.outputCollector = oc;
}
@Override
public void onData(final I element) {
- final KV kv = (KV) element;
+ // TODO #129: support window in group by key for windowed groupByKey
+ final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
+ final KV kv = windowedValue.getValue();
keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
keyToValues.get(kv.getKey()).add(kv.getValue());
}
@Override
public void close() {
+ // TODO #129: support window in group by key for windowed groupByKey
if (keyToValues.isEmpty()) {
LOG.warn("Beam GroupByKeyTransform received no data!");
} else {
- keyToValues.entrySet().stream().map(entry -> KV.of(entry.getKey(), entry.getValue()))
+ keyToValues.entrySet().stream().map(entry ->
+ WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), entry.getValue())))
.forEach(outputCollector::emit);
keyToValues.clear();
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
new file mode 100644
index 0000000..e7c9135
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * Windowing transform implementation.
+ * This transform simply windows the given elements into
+ * finite windows according to a user-specified WindowFnTransform.
+ * @param <T> input/output type.
+ * @param <W> window type
+ */
+public final class WindowFnTransform<T, W extends BoundedWindow>
+ implements Transform<WindowedValue<T>, WindowedValue<T>> {
+ private final WindowFn windowFn;
+ private OutputCollector<WindowedValue<T>> outputCollector;
+
+ /**
+ * Default Constructor.
+ * @param windowFn windowFn for the Transform.
+ */
+ public WindowFnTransform(final WindowFn windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public void prepare(final Context context, final OutputCollector<WindowedValue<T>> oc) {
+ this.outputCollector = oc;
+ }
+
+ @Override
+ public void onData(final WindowedValue<T> windowedValue) {
+ final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
+ final T element = windowedValue.getValue();
+ final Instant timestamp = windowedValue.getTimestamp();
+
+ try {
+ final Collection<W> windows =
+ ((WindowFn<T, W>) windowFn)
+ .assignWindows(
+ ((WindowFn<T, W>) windowFn).new AssignContext() {
+ @Override
+ public T element() {
+ return element;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return boundedWindow;
+ }
+ });
+
+ // Emit compressed windows for efficiency
+ outputCollector.emit(WindowedValue.of(element, timestamp, windows, PaneInfo.NO_FIRING));
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("WindowFnTransform:" + windowFn);
+ return sb.toString();
+ }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
deleted file mode 100644
index 20bc723..0000000
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowTransform.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nemo.compiler.frontend.beam.transform;
-
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-
-/**
- * Windowing transform implementation.
- * This transform simply windows the given elements into finite windows according to a user-specified WindowTransform.
- * As this functionality is unnecessary for batch processing workloads and for Runtime, this is left as below.
- * @param <T> input/output type.
- */
-public final class WindowTransform<T> implements Transform<T, T> {
- private final WindowFn windowFn;
- private OutputCollector<T> outputCollector;
-
- /**
- * Default Constructor.
- * @param windowFn windowFn for the Transform.
- */
- public WindowTransform(final WindowFn windowFn) {
- this.windowFn = windowFn;
- }
-
- @Override
- public void prepare(final Context context, final OutputCollector<T> oc) {
- this.outputCollector = oc;
- }
-
- @Override
- public void onData(final T element) {
- // TODO #1: Support Beam Streaming in Compiler.
- outputCollector.emit(element);
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("WindowTransform:" + windowFn);
- return sb.toString();
- }
-}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
similarity index 62%
copy from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
copy to compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
index c3f4bbc..909af4b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/PairKeyExtractor.java
@@ -13,24 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nemo.compiler.frontend.beam;
+package org.apache.nemo.compiler.optimizer;
import org.apache.nemo.common.KeyExtractor;
-import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.common.Pair;
/**
- * Extracts the key from a KV element.
- * For non-KV elements, the elements themselves become the key.
+ * Extracts the key from a pair element.
*/
-final class BeamKeyExtractor implements KeyExtractor {
+public final class PairKeyExtractor implements KeyExtractor {
@Override
public Object extractKey(final Object element) {
- if (element instanceof KV) {
- // Handle null keys, since Beam allows KV with null keys.
- final Object key = ((KV) element).getKey();
- return key == null ? 0 : key;
+ if (element instanceof Pair) {
+ return ((Pair) element).left();
} else {
- return element;
+ throw new IllegalStateException(element.toString());
}
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index adf573b..e4743b5 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -30,6 +30,7 @@ import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import org.apache.nemo.compiler.optimizer.PairKeyExtractor;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -178,7 +179,7 @@ public final class SkewReshapingPass extends ReshapingPass {
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
- newEdge.setProperty(KeyExtractorProperty.of(edge.getPropertyValue(KeyExtractorProperty.class).get()));
+ newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
// Dynamic optimization handles statistics on key-value data by default.
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
index 2f87188..0770c00 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/backend/nemo/DAGConverterTest.java
@@ -27,7 +27,6 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.transform.DoTransform;
import org.apache.nemo.common.test.EmptyComponents;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
@@ -66,7 +65,7 @@ public final class DAGConverterTest {
v1.setProperty(ParallelismProperty.of(3));
irDAGBuilder.addVertex(v1);
- final IRVertex v2 = new OperatorVertex(new DoTransform(null, null));
+ final IRVertex v2 = new OperatorVertex(mock(Transform.class));
v2.setProperty(ParallelismProperty.of(2));
irDAGBuilder.addVertex(v2);
@@ -113,7 +112,7 @@ public final class DAGConverterTest {
v1.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
final Transform t = mock(Transform.class);
- final DoTransform dt = new DoTransform(null, null);
+ final Transform dt = mock(Transform.class);
final IRVertex v2 = new OperatorVertex(t);
v2.setProperty(ParallelismProperty.of(3));
v2.setProperty(ResourcePriorityProperty.of(ResourcePriorityProperty.COMPUTE));
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
new file mode 100644
index 0000000..a30ee46
--- /dev/null
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.reef.io.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+
+import static java.util.Collections.emptyList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class DoFnTransformTest {
+
+ // views and windows for testing side inputs
+ private PCollectionView<Iterable<String>> view1;
+ private PCollectionView<Iterable<String>> view2;
+
+ private final static Coder NULL_INPUT_CODER = null;
+ private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
+
+ @Before
+ public void setUp() {
+ Pipeline.create().apply(Create.of("1"));
+ view1 = Pipeline.create().apply(Create.of("1")).apply(View.asIterable());
+ view2 = Pipeline.create().apply(Create.of("2")).apply(View.asIterable());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testSingleOutput() {
+
+ final TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+ final DoFnTransform<String, String> doFnTransform =
+ new DoFnTransform<>(
+ new IdentityDoFn<>(),
+ NULL_INPUT_CODER,
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ Collections.emptyList(),
+ WindowingStrategy.globalDefault(),
+ emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+ final Transform.Context context = mock(Transform.Context.class);
+ final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("Hello"));
+
+ assertEquals(((TestOutputCollector<String>) oc).outputs.get(0), WindowedValue.valueInGlobalWindow("Hello"));
+
+ doFnTransform.close();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultiOutputOutput() {
+
+ TupleTag<String> mainOutput = new TupleTag<>("main-output");
+ TupleTag<String> additionalOutput1 = new TupleTag<>("output-1");
+ TupleTag<String> additionalOutput2 = new TupleTag<>("output-2");
+
+ ImmutableList<TupleTag<?>> tags = ImmutableList.of(additionalOutput1, additionalOutput2);
+
+ ImmutableMap<String, String> tagsMap =
+ ImmutableMap.<String, String>builder()
+ .put(additionalOutput1.getId(), additionalOutput1.getId())
+ .put(additionalOutput2.getId(), additionalOutput2.getId())
+ .build();
+
+ final DoFnTransform<String, String> doFnTransform =
+ new DoFnTransform<>(
+ new MultiOutputDoFn(additionalOutput1, additionalOutput2),
+ NULL_INPUT_CODER,
+ NULL_OUTPUT_CODERS,
+ mainOutput,
+ tags,
+ WindowingStrategy.globalDefault(),
+ emptyList(), /* side inputs */
+ PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+ // mock context
+ final Transform.Context context = mock(Transform.Context.class);
+ when(context.getTagToAdditionalChildren()).thenReturn(tagsMap);
+
+ final OutputCollector<WindowedValue<String>> oc = new TestOutputCollector<>();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("one"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("two"));
+ doFnTransform.onData(WindowedValue.valueInGlobalWindow("hello"));
+
+ // main output
+ assertEquals(WindowedValue.valueInGlobalWindow("got: hello"),
+ ((TestOutputCollector<String>) oc).outputs.get(0));
+
+ // additional output 1
+ assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+ new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("extra: one"))
+ ));
+ assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+ new Tuple<>(additionalOutput1.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+ ));
+
+ // additional output 2
+ assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+ new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("extra: two"))
+ ));
+ assertTrue(((TestOutputCollector<String>) oc).getTaggedOutputs().contains(
+ new Tuple<>(additionalOutput2.getId(), WindowedValue.valueInGlobalWindow("got: hello"))
+ ));
+
+ doFnTransform.close();
+ }
+
+
+ // TODO #216: implement side input and windowing
+ @Test
+ public void testSideInputs() {
+ // mock context
+ final Transform.Context context = mock(Transform.Context.class);
+ when(context.getBroadcastVariable(view1)).thenReturn(
+ WindowedValue.valueInGlobalWindow(ImmutableList.of("1")));
+ when(context.getBroadcastVariable(view2)).thenReturn(
+ WindowedValue.valueInGlobalWindow(ImmutableList.of("2")));
+
+ TupleTag<Tuple<String, Iterable<String>>> outputTag = new TupleTag<>("main-output");
+
+ WindowedValue<String> first = WindowedValue.valueInGlobalWindow("first");
+ WindowedValue<String> second = WindowedValue.valueInGlobalWindow("second");
+
+ final Map<String, PCollectionView<Iterable<String>>> eventAndViewMap =
+ ImmutableMap.of(first.getValue(), view1, second.getValue(), view2);
+
+ final DoFnTransform<String, Tuple<String, Iterable<String>>> doFnTransform =
+ new DoFnTransform<>(
+ new SimpleSideInputDoFn<>(eventAndViewMap),
+ NULL_INPUT_CODER,
+ NULL_OUTPUT_CODERS,
+ outputTag,
+ Collections.emptyList(),
+ WindowingStrategy.globalDefault(),
+ ImmutableList.of(view1, view2), /* side inputs */
+ PipelineOptionsFactory.as(NemoPipelineOptions.class));
+
+ final OutputCollector<WindowedValue<Tuple<String, Iterable<String>>>> oc = new TestOutputCollector<>();
+ doFnTransform.prepare(context, oc);
+
+ doFnTransform.onData(first);
+ doFnTransform.onData(second);
+
+ assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("first", ImmutableList.of("1"))),
+ ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(0));
+
+ assertEquals(WindowedValue.valueInGlobalWindow(new Tuple<>("second", ImmutableList.of("2"))),
+ ((TestOutputCollector<Tuple<String,Iterable<String>>>) oc).getOutput().get(1));
+
+ doFnTransform.close();
+ }
+
+ private static final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>> {
+ private final List<WindowedValue<T>> outputs;
+ private final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
+
+ TestOutputCollector() {
+ this.outputs = new LinkedList<>();
+ this.taggedOutputs = new LinkedList<>();
+ }
+
+ @Override
+ public void emit(WindowedValue<T> output) {
+ outputs.add(output);
+ }
+
+ @Override
+ public <O> void emit(String dstVertexId, O output) {
+ final WindowedValue<T> val = (WindowedValue<T>) output;
+ final Tuple<String, WindowedValue<T>> tuple = new Tuple<>(dstVertexId, val);
+ taggedOutputs.add(tuple);
+ }
+
+ public List<WindowedValue<T>> getOutput() {
+ return outputs;
+ }
+
+ public List<Tuple<String, WindowedValue<T>>> getTaggedOutputs() {
+ return taggedOutputs;
+ }
+ }
+
+ /**
+ * Identitiy do fn.
+ * @param <T> type
+ */
+ private static class IdentityDoFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(final ProcessContext c) throws Exception {
+ c.output(c.element());
+ }
+ }
+
+ /**
+ * Side input do fn.
+ * @param <T> type
+ */
+ private static class SimpleSideInputDoFn<T, V> extends DoFn<T, Tuple<T, V>> {
+ private final Map<T, PCollectionView<V>> idAndViewMap;
+
+ public SimpleSideInputDoFn(final Map<T, PCollectionView<V>> idAndViewMap) {
+ this.idAndViewMap = idAndViewMap;
+ }
+
+ @ProcessElement
+ public void processElement(final ProcessContext c) throws Exception {
+ final PCollectionView<V> view = idAndViewMap.get(c.element());
+ final V sideInput = c.sideInput(view);
+ c.output(new Tuple<>(c.element(), sideInput));
+ }
+ }
+
+
+ /**
+ * Multi output do fn.
+ */
+ private static class MultiOutputDoFn extends DoFn<String, String> {
+ private TupleTag<String> additionalOutput1;
+ private TupleTag<String> additionalOutput2;
+
+ public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) {
+ this.additionalOutput1 = additionalOutput1;
+ this.additionalOutput2 = additionalOutput2;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ if ("one".equals(c.element())) {
+ c.output(additionalOutput1, "extra: one");
+ } else if ("two".equals(c.element())) {
+ c.output(additionalOutput2, "extra: two");
+ } else {
+ c.output("got: " + c.element());
+ c.output(additionalOutput1, "got: " + c.element());
+ c.output(additionalOutput2, "got: " + c.element());
+ }
+ }
+ }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
index 01d9efe..b2315da 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/MultinomialLogisticRegression.java
@@ -15,6 +15,8 @@
*/
package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.nemo.compiler.frontend.beam.transform.LoopCompositeTransform;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
import org.apache.nemo.common.Pair;
@@ -225,7 +227,9 @@ public final class MultinomialLogisticRegression {
@FinishBundle
public void finishBundle(final FinishBundleContext c) {
for (Integer i = 0; i < gradients.size(); i++) {
- c.output(KV.of(i, gradients.get(i)), null, null);
+ // this enforces a global window (batching),
+ // where all data elements of the corresponding PCollection are grouped and emitted downstream together
+ c.output(KV.of(i, gradients.get(i)), BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
}
LOG.info("stats: " + gradients.get(numClasses - 1).toString());
}
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index b8a2dcc..6081a95 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -93,7 +93,7 @@ public final class TaskRetryTest {
runPhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, injector);
}
- @Test(timeout=7000)
+ @Test(timeout=60000)
public void testExecutorRemoved() throws Exception {
// Until the plan finishes, events happen
while (!planStateManager.isPlanDone()) {
@@ -119,7 +119,7 @@ public final class TaskRetryTest {
assertTrue(planStateManager.isPlanDone());
}
- @Test(timeout=7000)
+ @Test(timeout=60000)
public void testTaskOutputWriteFailure() throws Exception {
// Three executors are used
executorAdded(1.0);