You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:23 UTC
[05/39] incubator-beam git commit: BEAM-261 PCollectionView and side
inputs.
BEAM-261 PCollectionView and side inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09754942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09754942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09754942
Branch: refs/heads/master
Commit: 09754942c66c9befffc8df9b3c8a75b819a672e6
Parents: 074b18f
Author: Thomas Weise <th...@apache.org>
Authored: Sun Sep 25 16:46:44 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Oct 16 23:25:55 2016 -0700
----------------------------------------------------------------------
.../beam/runners/apex/ApexPipelineOptions.java | 6 +
.../runners/apex/ApexPipelineTranslator.java | 19 +-
.../apache/beam/runners/apex/ApexRunner.java | 397 ++++++++++++++++++-
.../FlattenPCollectionTranslator.java | 26 +-
.../apex/translators/ParDoBoundTranslator.java | 22 +-
.../apex/translators/TranslationContext.java | 14 +-
.../functions/ApexFlattenOperator.java | 113 ++++++
.../functions/ApexGroupByKeyOperator.java | 78 +++-
.../functions/ApexParDoOperator.java | 210 ++++++++--
.../io/ApexReadUnboundedInputOperator.java | 31 +-
.../apex/translators/utils/ApexStreamTuple.java | 11 +
.../translators/utils/NoOpSideInputReader.java | 47 ---
.../beam/runners/apex/examples/IntTest.java | 133 +++++++
.../beam/runners/apex/examples/IntTests.java | 207 ----------
.../translators/ParDoBoundTranslatorTest.java | 37 +-
.../apex/src/test/resources/log4j.properties | 4 +-
16 files changed, 1028 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index f70d24c..141a8c1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -50,6 +50,12 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
@Default.Boolean(true)
boolean isEmbeddedExecutionDebugMode();
+ @Description("output data received and emitted on ports (for debugging)")
+ void setTupleTracingEnabled(boolean enabled);
+
+ @Default.Boolean(false)
+ boolean isTupleTracingEnabled();
+
@Description("how long the client should wait for the pipeline to run")
void setRunMillis(long runMillis);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index b0391b4..ad8c283 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.apex;
+import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
@@ -35,8 +36,8 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +72,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ registerTransformTranslator(CreateApexPCollectionView.class, new CreatePCollectionViewTranslator());
}
public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -98,7 +100,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
PTransform transform = node.getTransform();
TransformTranslator translator = getTransformTranslator(transform.getClass());
if (null == translator) {
- throw new IllegalStateException(
+ throw new UnsupportedOperationException(
"no translator registered for " + transform);
}
translationContext.setCurrentTransform(node);
@@ -147,4 +149,17 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
}
+ private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context)
+ {
+ PCollectionView<ViewT> view = transform.getView();
+ context.addView(view);
+ LOG.debug("view {}", view.getName());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 5fa3f23..ae79a20 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -19,20 +19,36 @@ package org.apache.beam.runners.apex;
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
import org.apache.beam.runners.apex.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.runners.core.AssignWindows;
+import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.hadoop.conf.Configuration;
@@ -55,6 +71,13 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
private final ApexPipelineOptions options;
+ /**
+ * TODO: this isn't thread sa
+ * Holds any most resent assertion error that was raised while processing elements.
+ * Used in the unit test driver in embedded to propagate the exception.
+ */
+ public static volatile AssertionError assertionError;
+
public ApexRunner(ApexPipelineOptions options) {
this.options = options;
}
@@ -77,6 +100,32 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED);
+// TODO: replace this with a mapping
+ } else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
+ PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this,
+ (Combine.GloballyAsSingletonView)transform);
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (View.AsSingleton.class.equals(transform.getClass())) {
+ // note this assumes presence of above Combine.GloballyAsSingletonView mapping
+ PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this,
+ (View.AsSingleton)transform);
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (View.AsIterable.class.equals(transform.getClass())) {
+ PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this,
+ (View.AsIterable)transform);
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (View.AsList.class.equals(transform.getClass())) {
+ PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsList<InputT>(this,
+ (View.AsList)transform);
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (View.AsMap.class.equals(transform.getClass())) {
+ PTransform<InputT, OutputT> customTransform = new StreamingViewAsMap(this,
+ (View.AsMap)transform);
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (View.AsMultimap.class.equals(transform.getClass())) {
+ PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this,
+ (View.AsMultimap)transform);
+ return Pipeline.applyTransform(input, customTransform);
} else {
return super.apply(transform, input);
}
@@ -109,10 +158,19 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
// turns off timeout checking for operator progress
lc.setHeartbeatMonitoringEnabled(false);
}
+ assertionError = null;
+ lc.runAsync();
if (options.getRunMillis() > 0) {
- lc.run(options.getRunMillis());
- } else {
- lc.runAsync();
+ try {
+ long timeout = System.currentTimeMillis() + options.getRunMillis();
+ while (System.currentTimeMillis() < timeout) {
+ if (assertionError != null) {
+ throw assertionError;
+ }
+ }
+ } finally {
+ lc.shutdown();
+ }
}
return new ApexRunnerResult(lma.getDAG(), lc);
} catch (Exception e) {
@@ -158,10 +216,343 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
}
private static class IdentityFn<T> extends DoFn<T, T> {
+ private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}
+////////////////////////////////////////////
+// Adapted from FlinkRunner for View support
+
+ /**
+ * Records that the {@link PTransform} requires a deterministic key coder.
+ */
+ private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateApexPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private PCollectionView<ViewT> view;
+
+ private CreateApexPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateApexPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateApexPCollectionView<>(view);
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+
+ @Override
+ public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+
+ private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(Arrays.asList(c.element()));
+ }
+ }
+
+ private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>>
+ {
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ public StreamingCombineGloballyAsSingletonView(ApexRunner runner,
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform)
+ {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> apply(PCollection<InputT> input)
+ {
+ PCollection<OutputT> combined = input
+ .apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(),
+ combined.getWindowingStrategy(), transform.getInsertDefault(),
+ transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder());
+ return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
+ }
+
+ @Override
+ protected String getKindString()
+ {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class StreamingViewAsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>>
+ {
+ private static final long serialVersionUID = 1L;
+ private View.AsSingleton<T> transform;
+
+ public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform)
+ {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> apply(PCollection<T> input)
+ {
+ Combine.Globally<T, T> combine = Combine
+ .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString()
+ {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T>
+ {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue)
+ {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right)
+ {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity()
+ {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ private static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ private final ApexRunner runner;
+
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ private final ApexRunner runner;
+
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> transform) {
+ this.runner = runner;
+ }
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ runner.recordViewUsesNonDeterministicKeyCoder(this);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> apply(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateApexPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+ * Flink runner in streaming mode.
+ */
+ private static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+ /**
+ * Builds an instance of this class from the overridden transform.
+ */
+ @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+ public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform) { }
+
+ @Override
+ public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<T>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index e153867..712466a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,19 +18,23 @@
package org.apache.beam.runners.apex.translators;
+import java.util.Collections;
import java.util.List;
+import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator;
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.io.ValuesSource;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
-import com.datatorrent.lib.stream.StreamMerger;
import com.google.common.collect.Lists;
/**
- * Flatten.FlattenPCollectionList translation to Apex operator.
- * TODO: support more than two streams
+ * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
*/
public class FlattenPCollectionTranslator<T> implements
TransformTranslator<Flatten.FlattenPCollectionList<T>> {
@@ -38,16 +42,28 @@ public class FlattenPCollectionTranslator<T> implements
@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
- PCollection<T> firstCollection = null;
PCollectionList<T> input = context.getInput();
List<PCollection<T>> collections = input.getAll();
+
+ if (collections.isEmpty()) {
+ // create a dummy source that never emits anything
+ @SuppressWarnings("unchecked")
+ UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST,
+ (Coder<T>) VoidCoder.of());
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ return;
+ }
+
List<PCollection<T>> remainingCollections = Lists.newArrayList();
+ PCollection<T> firstCollection = null;
while (!collections.isEmpty()) {
for (PCollection<T> collection : collections) {
if (null == firstCollection) {
firstCollection = collection;
} else {
- StreamMerger<T> operator = new StreamMerger<>();
+ ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
context.addStream(firstCollection, operator.data1);
context.addStream(collection, operator.data2);
if (collections.size() > 2) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index a958234..632829a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -18,11 +18,15 @@
package org.apache.beam.runners.apex.translators;
+import java.util.List;
+
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.datatorrent.api.Operator;
/**
* {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn}
@@ -35,9 +39,23 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
OldDoFn<InputT, OutputT> doFn = transform.getFn();
PCollection<OutputT> output = context.getOutput();
+ List<PCollectionView<?>> sideInputs = transform.getSideInputs();
ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(),
- doFn, output.getWindowingStrategy(), new NoOpSideInputReader());
+ doFn, output.getWindowingStrategy(), sideInputs);
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
+ if (!sideInputs.isEmpty()) {
+ Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+ for (int i=0; i<sideInputs.size(); i++) {
+ // the number of input ports for side inputs are fixed and each port can only take one input.
+ // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced.
+ if (i == sideInputPorts.length) {
+ String msg = String.format("Too many side inputs in %s (currently only supporting %s).",
+ transform.toString(), sideInputPorts.length);
+ throw new UnsupportedOperationException(msg);
+ }
+ context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index ab7cd0a..163cfd4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -54,6 +55,17 @@ public class TranslationContext {
private AppliedPTransform<?, ?, ?> currentTransform;
private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>();
private final Map<String, Operator> operators = new HashMap<>();
+ private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
+
+ public void addView(PCollectionView<?> view) {
+ this.viewInputs.put(view, this.getInput());
+ }
+
+ public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
+ PInput input = this.viewInputs.get(view);
+ checkArgument(input != null, "unknown view " + view.getName());
+ return (InputT)input;
+ }
public TranslationContext(ApexPipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
@@ -102,7 +114,7 @@ public class TranslationContext {
public void addStream(PInput input, InputPort inputPort) {
Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
- checkArgument(stream != null, "no upstream operator defined");
+ checkArgument(stream != null, "no upstream operator defined for %s", input);
stream.getRight().add(inputPort);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
new file mode 100644
index 0000000..ce27abb
--- /dev/null
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.functions;
+
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
+ */
+public class ApexFlattenOperator<InputT> extends BaseOperator
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class);
+ private boolean traceTuples = true;
+
+ private long inputWM1;
+ private long inputWM2;
+ private long outputWM;
+
+ /**
+ * Data input port 1.
+ */
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+ {
+ /**
+ * Emits to port "out"
+ */
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
+ {
+ if (tuple instanceof WatermarkTuple) {
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+ if (wmTuple.getTimestamp() > inputWM1) {
+ inputWM1 = wmTuple.getTimestamp();
+ if (inputWM1 <= inputWM2) {
+ // move output watermark and emit it
+ outputWM = inputWM1;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", outputWM);
+ }
+ out.emit(tuple);
+ }
+ }
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+ out.emit(tuple);
+ }
+ };
+
+ /**
+ * Data input port 2.
+ */
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+ {
+ /**
+ * Emits to port "out"
+ */
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
+ {
+ if (tuple instanceof WatermarkTuple) {
+ WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+ if (wmTuple.getTimestamp() > inputWM2) {
+ inputWM2 = wmTuple.getTimestamp();
+ if (inputWM2 <= inputWM1) {
+ // move output watermark and emit it
+ outputWM = inputWM2;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", outputWM);
+ }
+ out.emit(tuple);
+ }
+ }
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+ out.emit(tuple);
+ }
+ };
+
+ /**
+ * Output port.
+ */
+ public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 29e1b32..5970f36 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.apex.translators.functions;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -33,6 +34,7 @@ import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimerInternals;
@@ -55,6 +58,8 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
@@ -79,16 +84,22 @@ import com.google.common.collect.Multimap;
*/
public class ApexGroupByKeyOperator<K, V> implements Operator
{
+ private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
+ private boolean traceTuples = true;
+
@Bind(JavaSerializer.class)
private WindowingStrategy<V, BoundedWindow> windowingStrategy;
@Bind(JavaSerializer.class)
+ private Coder<K> keyCoder;
+ @Bind(JavaSerializer.class)
private Coder<V> valueCoder;
@Bind(JavaSerializer.class)
private final SerializablePipelineOptions serializedOptions;
@Bind(JavaSerializer.class)
- private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>();
- private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+// TODO: InMemoryStateInternals not serializable
+transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>();
+ private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
private transient ProcessContext context;
private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
@@ -100,14 +111,19 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
@Override
public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
{
- //System.out.println("\n***RECEIVED: " +t);
try {
if (t instanceof ApexStreamTuple.WatermarkTuple) {
ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t;
processWatermark(mark);
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
+ }
output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp()));
return;
}
+ if (traceTuples) {
+ LOG.debug("\ninput {}\n", t.getValue());
+ }
processElement(t.getValue());
} catch (Exception e) {
Throwables.propagate(e);
@@ -124,6 +140,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
Preconditions.checkNotNull(pipelineOptions);
this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy();
+ this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder();
this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
}
@@ -146,6 +163,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
@Override
public void setup(OperatorContext context)
{
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory();
this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory,
SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
@@ -163,16 +181,16 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
* We keep these timers in a Set, so that they are deduplicated, as the same
* timer can be registered multiple times.
*/
- private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
+ private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) {
// we keep the timers to return in a different list and launch them later
// because we cannot prevent a trigger from registering another trigger,
// which would lead to concurrent modification exception.
- Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+ Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create();
- Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
+ Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+ Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator();
while (timerIt.hasNext()) {
@@ -205,44 +223,64 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
fn.processElement(context);
}
- private StateInternals<K> getStateInternalsForKey(K key) {
- StateInternals<K> stateInternals = perKeyStateInternals.get(key);
+ private StateInternals<K> getStateInternalsForKey(K key)
+ {
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw Throwables.propagate(e);
+ }
+ StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
if (stateInternals == null) {
//Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
//OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn();
stateInternals = InMemoryStateInternals.forKey(key);
- perKeyStateInternals.put(key, stateInternals);
+ perKeyStateInternals.put(keyBytes, stateInternals);
}
return stateInternals;
}
private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw Throwables.propagate(e);
+ }
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
if (timersForKey == null) {
timersForKey = new HashSet<>();
}
timersForKey.add(timer);
- activeTimers.put(key, timersForKey);
+ activeTimers.put(keyBytes, timersForKey);
}
private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
- Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+ final ByteBuffer keyBytes;
+ try {
+ keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+ } catch (CoderException e) {
+ throw Throwables.propagate(e);
+ }
+ Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
if (timersForKey != null) {
timersForKey.remove(timer);
if (timersForKey.isEmpty()) {
- activeTimers.remove(key);
+ activeTimers.remove(keyBytes);
} else {
- activeTimers.put(key, timersForKey);
+ activeTimers.put(keyBytes, timersForKey);
}
}
}
private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception {
this.inputWatermark = new Instant(mark.getTimestamp());
- Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
+ Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp());
if (!timers.isEmpty()) {
- for (K key : timers.keySet()) {
- KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(key));
+ for (ByteBuffer keyBytes : timers.keySet()) {
+ K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+ KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes));
context.setElement(kwi, getStateInternalsForKey(kwi.key()));
fn.processElement(context);
}
@@ -315,7 +353,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
@Override
public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
- System.out.println("\n***EMITTING: " + output + ", timestamp=" + timestamp);
+ if (traceTuples) {
+ LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+ }
ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index d358d14..13a8fc9 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -18,40 +18,58 @@
package org.apache.beam.runners.apex.translators.functions;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.google.common.collect.Iterables;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
/**
* Apex operator for Beam {@link DoFn}.
*/
public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class);
+ private boolean traceTuples = true;
private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>();
- private transient DoFnRunner<InputT, OutputT> doFnRunner;
+ private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner;
@Bind(JavaSerializer.class)
private final SerializablePipelineOptions pipelineOptions;
@@ -60,17 +78,28 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Bind(JavaSerializer.class)
private final WindowingStrategy<?, ?> windowingStrategy;
@Bind(JavaSerializer.class)
- private final SideInputReader sideInputReader;
+ List<PCollectionView<?>> sideInputs;
+// TODO: not Kryo serializable, integrate codec
+//@Bind(JavaSerializer.class)
+private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null);
+ private transient SideInputHandler sideInputHandler;
+ // TODO: not Kryo serializable, integrate codec
+ private List<WindowedValue<InputT>> pushedBack = new ArrayList<>();
+ private LongMin pushedBackWatermark = new LongMin();
+ private long currentInputWatermark = Long.MIN_VALUE;
+ private long currentOutputWatermark = currentInputWatermark;
public ApexParDoOperator(
ApexPipelineOptions pipelineOptions,
OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
- SideInputReader sideInputReader) {
+ List<PCollectionView<?>> sideInputs
+ )
+ {
this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
this.doFn = doFn;
this.windowingStrategy = windowingStrategy;
- this.sideInputReader = sideInputReader;
+ this.sideInputs = sideInputs;
}
@SuppressWarnings("unused") // for Kryo
@@ -78,17 +107,60 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
this(null, null, null, null);
}
+
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
{
@Override
public void process(ApexStreamTuple<WindowedValue<InputT>> t)
{
if (t instanceof ApexStreamTuple.WatermarkTuple) {
- output.emit(t);
+ processWatermark((ApexStreamTuple.WatermarkTuple<?>)t);
} else {
- System.out.println("\n" + Thread.currentThread().getName() + "\n" + t.getValue() + "\n");
- doFnRunner.processElement(t.getValue());
+ if (traceTuples) {
+ LOG.debug("\ninput {}\n", t.getValue());
+ }
+ Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue());
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+ }
+ }
+ };
+
+ @InputPortFieldAnnotation(optional=true)
+ public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>()
+ {
+ private final int sideInputIndex = 0;
+
+ @Override
+ public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t)
+ {
+ if (t instanceof ApexStreamTuple.WatermarkTuple) {
+ // ignore side input watermarks
+ return;
+ }
+ if (traceTuples) {
+ LOG.debug("\nsideInput {}\n", t.getValue());
}
+ PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
+ sideInputHandler.addSideInputValue(sideInput, t.getValue());
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+ for (WindowedValue<InputT> elem : pushedBack) {
+ Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+
+ pushedBack.clear();
+ pushedBackWatermark.clear();
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+ pushedBack.add(pushedBackValue);
+ }
+
+ // potentially emit watermark
+ processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
}
};
@@ -99,27 +171,82 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
{
output.emit(ApexStreamTuple.DataTuple.of(tuple));
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", tuple);
+ }
+ }
+
+ private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ try {
+ return pushbackDoFnRunner.processElementInReadyWindows(elem);
+ } catch (UserCodeException ue) {
+ if (ue.getCause() instanceof AssertionError) {
+ ApexRunner.assertionError = (AssertionError)ue.getCause();
+ }
+ throw ue;
+ }
+ }
+
+ private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark)
+ {
+ this.currentInputWatermark = mark.getTimestamp();
+
+ if (sideInputs.isEmpty()) {
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark);
+ }
+ output.emit(mark);
+ return;
+ }
+
+ long potentialOutputWatermark =
+ Math.min(pushedBackWatermark.get(), currentInputWatermark);
+ if (potentialOutputWatermark > currentOutputWatermark) {
+ currentOutputWatermark = potentialOutputWatermark;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+ }
+ output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+ }
}
@Override
public void setup(OperatorContext context)
{
- this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(),
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+ SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+ if (!sideInputs.isEmpty()) {
+ sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);
+ sideInputReader = sideInputHandler;
+ }
+
+ DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+ pipelineOptions.get(),
doFn,
sideInputReader,
this,
mainTag,
- TupleTagList.empty().getAll(),
+ TupleTagList.empty().getAll() /*sideOutputTags*/,
new NoOpStepContext(),
new NoOpAggregatorFactory(),
windowingStrategy
);
+
+ pushbackDoFnRunner =
+ PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
+
+ try {
+ doFn.setup();
+ } catch (Exception e) {
+ Throwables.propagate(e);
+ }
+
}
@Override
public void beginWindow(long windowId)
{
- doFnRunner.startBundle();
+ pushbackDoFnRunner.startBundle();
/*
Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn);
if (!aggregators.isEmpty()) {
@@ -131,14 +258,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
@Override
public void endWindow()
{
- doFnRunner.finishBundle();
+ pushbackDoFnRunner.finishBundle();
}
/**
* TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode.
* It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
*/
- public class NoOpAggregatorFactory implements AggregatorFactory {
+ public static class NoOpAggregatorFactory implements AggregatorFactory {
private NoOpAggregatorFactory() {
}
@@ -147,31 +274,52 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
Class<?> fnClass, ExecutionContext.StepContext step,
String name, CombineFn<InputT, AccumT, OutputT> combine) {
- return new Aggregator<InputT, OutputT>() {
+ return new NoOpAggregator<InputT, OutputT>();
+ }
- @Override
- public void addValue(InputT value)
- {
- }
+ private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, java.io.Serializable
+ {
+ private static final long serialVersionUID = 1L;
- @Override
- public String getName()
- {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public void addValue(InputT value)
+ {
+ }
+
+ @Override
+ public String getName()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public CombineFn<InputT, ?, OutputT> getCombineFn()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ };
- @Override
- public CombineFn<InputT, ?, OutputT> getCombineFn()
- {
- // TODO Auto-generated method stub
- return null;
- }
- };
- }
}
+ private static class LongMin {
+ long state = Long.MAX_VALUE;
+ public void add(long l) {
+ state = Math.min(state, l);
+ }
+
+ public long get() {
+ return state;
+ }
+
+ public void clear() {
+ state = Long.MAX_VALUE;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 39114fe..6ee82ea 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -28,8 +28,11 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.google.common.base.Throwables;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
@@ -40,10 +43,14 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.IOException;
/**
- * Apex input operator that wraps Beam UnboundedSource.
+ * Apex input operator that wraps Beam {@link UnboundedSource}.
*/
public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
implements InputOperator {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ApexReadUnboundedInputOperator.class);
+ private boolean traceTuples = false;
+ private long outputWatermark = 0;
@Bind(JavaSerializer.class)
private final SerializablePipelineOptions pipelineOptions;
@@ -51,6 +58,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
private final UnboundedSource<OutputT, CheckpointMarkT> source;
private transient UnboundedSource.UnboundedReader<OutputT> reader;
private transient boolean available = false;
+ @OutputPortFieldAnnotation(optional=true)
public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>();
public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) {
@@ -66,12 +74,23 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
@Override
public void beginWindow(long windowId)
{
- Instant mark = reader.getWatermark();
- output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis()));
if (!available && source instanceof ValuesSource) {
- // if it's a Create transformation and the input was consumed,
+ // if it's a Create and the input was consumed, emit final watermark
+ emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
// terminate the stream (allows tests to finish faster)
BaseOperator.shutdown();
+ } else {
+ emitWatermarkIfNecessary(reader.getWatermark().getMillis());
+ }
+ }
+
+ private void emitWatermarkIfNecessary(long mark) {
+ if (mark > outputWatermark) {
+ outputWatermark = mark;
+ if (traceTuples) {
+ LOG.debug("\nemitting watermark {}\n", mark);
+ }
+ output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
}
}
@@ -83,6 +102,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
@Override
public void setup(OperatorContext context)
{
+ this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
try {
reader = source.createReader(this.pipelineOptions.get(), null);
available = reader.start();
@@ -114,6 +134,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb
OutputT data = reader.getCurrent();
Instant timestamp = reader.getCurrentTimestamp();
available = reader.advance();
+ if (traceTuples) {
+ LOG.debug("\nemitting {}\n", data);
+ }
output.emit(DataTuple.of(WindowedValue.of(
data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index efb69ee..06940aa 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -27,10 +27,13 @@ import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StandardCoder;
+import com.datatorrent.api.Operator;
+
public interface ApexStreamTuple<T>
{
/**
@@ -188,4 +191,12 @@ public interface ApexStreamTuple<T>
}
+ final class Logging
+ {
+ public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator)
+ {
+ return options.isTupleTracingEnabled();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
deleted file mode 100644
index ffe1a29..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-/**
- * no-op side input reader.
- */
-public class NoOpSideInputReader implements SideInputReader, Serializable {
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> view, BoundedWindow window) {
- return null;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return false;
- }
-
- @Override
- public boolean isEmpty() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
new file mode 100644
index 0000000..3573d31
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.examples;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * For debugging only.
+ */
+@Ignore
+@RunWith(JUnit4.class)
+public class IntTest implements java.io.Serializable
+{
+
+ @Test
+ public void test()
+ {
+ ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+ options.setTupleTracingEnabled(true);
+ options.setRunner(TestApexRunner.class);
+ Pipeline p = Pipeline.create(options);
+boolean timeBound = false;
+
+
+ TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
+//List<KV<Integer,Integer>> values = Lists.newArrayList(
+// KV.of(0, 99),KV.of(0, 99),KV.of(0, 98));
+
+//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values,
+// KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+ if (true) {
+ source = source.withDedup();
+ }
+
+ PCollection<KV<Integer, Integer>> output =
+ timeBound
+ ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200)))
+ : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS));
+
+ List<KV<Integer, Integer>> expectedOutput = new ArrayList<>();
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ expectedOutput.add(KV.of(0, i));
+ }
+
+ // Because some of the NUM_RECORDS elements read are dupes, the final output
+ // will only have output from 0 to n where n < NUM_RECORDS.
+ PAssert.that(output).satisfies(new Checker(true, timeBound));
+
+
+ p.run();
+ return;
+ }
+
+ private static final int NUM_RECORDS = 10;
+ private static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void>
+ {
+ private final boolean dedup;
+ private final boolean timeBound;
+
+ Checker(boolean dedup, boolean timeBound)
+ {
+ this.dedup = dedup;
+ this.timeBound = timeBound;
+ }
+
+ @Override
+ public Void apply(Iterable<KV<Integer, Integer>> input)
+ {
+ List<Integer> values = new ArrayList<>();
+ for (KV<Integer, Integer> kv : input) {
+ assertEquals(0, (int)kv.getKey());
+ values.add(kv.getValue());
+ }
+ if (timeBound) {
+ assertTrue(values.size() >= 1);
+ } else if (dedup) {
+ // Verify that at least some data came through. The chance of 90% of the input
+ // being duplicates is essentially zero.
+ assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= NUM_RECORDS);
+ } else {
+ assertEquals(NUM_RECORDS, values.size());
+ }
+ Collections.sort(values);
+ for (int i = 0; i < values.size(); i++) {
+ assertEquals(i, (int)values.get(i));
+ }
+ //if (finalizeTracker != null) {
+ // assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1));
+ //}
+ return null;
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
deleted file mode 100644
index 0ee3442..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
+++ /dev/null
@@ -1,207 +0,0 @@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.examples;
-
-
- import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
- import static org.hamcrest.Matchers.is;
- import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
- import org.apache.beam.sdk.testing.PAssert;
- import org.apache.beam.sdk.testing.RunnableOnService;
- import org.apache.beam.sdk.testing.TestPipeline;
- import org.apache.beam.sdk.transforms.Count;
- import org.apache.beam.sdk.transforms.DoFn;
- import org.apache.beam.sdk.transforms.Max;
- import org.apache.beam.sdk.transforms.Min;
- import org.apache.beam.sdk.transforms.PTransform;
- import org.apache.beam.sdk.transforms.ParDo;
- import org.apache.beam.sdk.transforms.RemoveDuplicates;
- import org.apache.beam.sdk.transforms.SerializableFunction;
- import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
- import org.joda.time.Duration;
- import org.joda.time.Instant;
- import org.junit.Test;
- import org.junit.experimental.categories.Category;
- import org.junit.runner.RunWith;
- import org.junit.runners.JUnit4;
-
- /**
- * Tests for {@link CountingInput}.
- */
- @RunWith(JUnit4.class)
- public class IntTests {
- public static void addCountingAsserts(PCollection<Long> input, long numElements) {
- // Count == numElements
- PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Unique count == numElements
- PAssert.thatSingleton(
- input
- .apply(RemoveDuplicates.<Long>create())
- .apply("UniqueCount", Count.<Long>globally()))
- .isEqualTo(numElements);
- // Min == 0
- PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
- // Max == numElements-1
- PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
- .isEqualTo(numElements - 1);
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testBoundedInput() {
- //Pipeline p = TestPipeline.create();
- ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
- options.setRunner(TestApexRunner.class);
- Pipeline p = Pipeline.create(options);
-
- long numElements = 1000;
- PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
-
- addCountingAsserts(input, numElements);
- p.run();
- }
-
- @Test
- public void testBoundedDisplayData() {
- PTransform<?, ?> input = CountingInput.upTo(1234);
- DisplayData displayData = DisplayData.from(input);
- assertThat(displayData, hasDisplayItem("upTo", 1234));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedInput() {
- //Pipeline p = TestPipeline.create();
- ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
- options.setRunner(TestApexRunner.class);
- Pipeline p = Pipeline.create(options);
-
-
- long numElements = 1000;
-
- PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
-
-// input = input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10))));
-
- addCountingAsserts(input, numElements);
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testUnboundedInputRate() {
- Pipeline p = TestPipeline.create();
- long numElements = 5000;
-
- long elemsPerPeriod = 10L;
- Duration periodLength = Duration.millis(8);
- PCollection<Long> input =
- p.apply(
- CountingInput.unbounded()
- .withRate(elemsPerPeriod, periodLength)
- .withMaxNumRecords(numElements));
-
- addCountingAsserts(input, numElements);
- long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
- Instant startTime = Instant.now();
- p.run();
- Instant endTime = Instant.now();
- assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
- }
-
- private static class ElementValueDiff extends DoFn<Long, Long> {
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(c.element() - c.timestamp().getMillis());
- }
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testUnboundedInputTimestamps() {
- Pipeline p = TestPipeline.create();
- long numElements = 1000;
-
- PCollection<Long> input =
- p.apply(
- CountingInput.unbounded()
- .withTimestampFn(new ValueAsTimestampFn())
- .withMaxNumRecords(numElements));
- addCountingAsserts(input, numElements);
-
- PCollection<Long> diffs =
- input
- .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
- .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
- // This assert also confirms that diffs only has one unique value.
- PAssert.thatSingleton(diffs).isEqualTo(0L);
-
- p.run();
- }
-
- @Test
- public void testUnboundedDisplayData() {
- Duration maxReadTime = Duration.standardHours(5);
- SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() {
- @Override
- public Instant apply(Long input) {
- return Instant.now();
- }
- };
-
- PTransform<?, ?> input = CountingInput.unbounded()
- .withMaxNumRecords(1234)
- .withMaxReadTime(maxReadTime)
- .withTimestampFn(timestampFn);
-
- DisplayData displayData = DisplayData.from(input);
-
- assertThat(displayData, hasDisplayItem("maxRecords", 1234));
- assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
- assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
- }
-
- /**
- * A timestamp function that uses the given value as the timestamp. Because the input values will
- * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
- * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
- */
- private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> {
- @Override
- public Instant apply(Long input) {
- return new Instant(input);
- }
- }
-
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 06aaf55..6239021 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -20,20 +20,25 @@ package org.apache.beam.runners.apex.translators;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -48,6 +53,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.regex.Pattern;
@@ -123,13 +129,11 @@ public class ParDoBoundTranslatorTest {
}
}
-
- @Ignore
@Test
public void testAssertionFailure() throws Exception {
ApexPipelineOptions options = PipelineOptionsFactory.create()
.as(ApexPipelineOptions.class);
- options.setRunner(ApexRunner.class);
+ options.setRunner(TestApexRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<Integer> pcollection = pipeline
@@ -149,6 +153,16 @@ public class ParDoBoundTranslatorTest {
expectedPattern.matcher(exc.getMessage()).find());
}
+ @Test
+ public void testContainsInAnyOrder() throws Exception {
+ ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+ options.setRunner(TestApexRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
+ PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+ PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
+ pipeline.run();
+ }
+
private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
// We cannot use thrown.expect(AssertionError.class) because the AssertionError
// is first caught by JUnit and causes a test failure.
@@ -161,4 +175,19 @@ public class ParDoBoundTranslatorTest {
throw new RuntimeException("unreachable");
}
+ @Test
+ public void testSerialization() throws Exception {
+ ApexPipelineOptions options = PipelineOptionsFactory.create()
+ .as(ApexPipelineOptions.class);
+ ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options,
+ new Add(0), WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList());
+ operator.setup(null);
+ operator.beginWindow(0);
+ WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);
+ operator.input.process(ApexStreamTuple.DataTuple.of(wv));
+ operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0));
+ operator.endWindow();
+ Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties
index 84a6f68..c0efc5d 100644
--- a/runners/apex/src/test/resources/log4j.properties
+++ b/runners/apex/src/test/resources/log4j.properties
@@ -26,8 +26,8 @@ log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.org=debug
+log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=info
log4j.logger.org.apache.apex=debug
log4j.logger.org.apache.beam.runners.apex=debug