You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:29 UTC
[04/50] [abbrv] beam git commit: [BEAM-79] Add SideInput support for
GearpumpRunner
[BEAM-79] Add SideInput support for GearpumpRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4eb50d15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4eb50d15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4eb50d15
Branch: refs/heads/master
Commit: 4eb50d152b91df46bd7f0478650cb4abac3808c6
Parents: 2d0aed9
Author: manuzhang <ow...@gmail.com>
Authored: Tue Feb 14 12:33:31 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Mar 7 22:15:26 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 16 +-
.../gearpump/GearpumpPipelineResult.java | 47 +--
.../gearpump/GearpumpPipelineTranslator.java | 8 +
.../beam/runners/gearpump/GearpumpRunner.java | 373 ++++++++++++++++++-
.../runners/gearpump/TestGearpumpRunner.java | 39 +-
...CreateGearpumpPCollectionViewTranslator.java | 44 +++
.../CreatePCollectionViewTranslator.java | 43 +++
.../translators/CreateValuesTranslator.java | 2 +
.../FlattenPCollectionTranslator.java | 38 ++
.../translators/GroupByKeyTranslator.java | 82 ++--
.../translators/ParDoBoundMultiTranslator.java | 165 +++-----
.../translators/ParDoBoundTranslator.java | 32 +-
.../translators/WindowBoundTranslator.java | 21 +-
.../translators/functions/DoFnFunction.java | 158 ++++++--
.../translators/io/BoundedSourceWrapper.java | 1 +
.../gearpump/translators/io/GearpumpSource.java | 23 +-
.../gearpump/translators/io/ValuesSource.java | 14 +-
.../translators/utils/DoFnRunnerFactory.java | 20 +-
.../translators/utils/NoOpSideInputReader.java | 48 ---
.../translators/utils/TranslatorUtils.java | 147 ++++++++
20 files changed, 1002 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 6f91c50..6a41dc0 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -75,23 +75,9 @@
<dependenciesToScan>
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
</dependenciesToScan>
- <argLine>-noverify</argLine>
<excludes>
- <!-- side input is not supported in Gearpump -->
<exclude>
- org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOTest,
- org.apache.beam.sdk.io.CountingInputTest,
- org.apache.beam.sdk.io.CountingSourceTest,
- org.apache.beam.sdk.testing.PAssertTest,
- org.apache.beam.sdk.transforms.ApproximateUniqueTest,
- org.apache.beam.sdk.transforms.CombineTest,
- org.apache.beam.sdk.transforms.CombineFnsTest,
- org.apache.beam.sdk.transforms.CountTest,
- org.apache.beam.sdk.transforms.FlattenTest,
- org.apache.beam.sdk.transforms.ParDoTest,
- org.apache.beam.sdk.transforms.SampleTest,
- org.apache.beam.sdk.transforms.ViewTest,
- org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
+ org.apache.beam.sdk.transforms.ParDoTest
</exclude>
</excludes>
<systemPropertyVariables>
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index a3740b7..8f90898 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.gearpump.cluster.ApplicationStatus;
import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.client.RunningApplication;
import org.joda.time.Duration;
import scala.collection.JavaConverters;
@@ -41,13 +42,11 @@ import scala.collection.Seq;
public class GearpumpPipelineResult implements PipelineResult {
private final ClientContext client;
- private final int appId;
- private final Duration defaultWaitDuration = Duration.standardSeconds(30);
- private final Duration defaultWaitInterval = Duration.standardSeconds(5);
+ private final RunningApplication app;
- public GearpumpPipelineResult(ClientContext client, int appId) {
+ public GearpumpPipelineResult(ClientContext client, RunningApplication app) {
this.client = client;
- this.appId = appId;
+ this.app = app;
}
@Override
@@ -57,38 +56,19 @@ public class GearpumpPipelineResult implements PipelineResult {
@Override
public State cancel() throws IOException {
- client.shutdown(appId);
+ app.shutDown();
return State.CANCELLED;
}
@Override
public State waitUntilFinish(Duration duration) {
- long start = System.currentTimeMillis();
- do {
- try {
- Thread.sleep(defaultWaitInterval.getMillis());
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- if (e instanceof RuntimeException) {
- throw (RuntimeException) e;
- }
- throw new RuntimeException(e);
- }
- } while (State.RUNNING == getGearpumpState()
- && (System.currentTimeMillis() - start) < duration.getMillis());
-
- if (State.RUNNING == getGearpumpState()) {
- return State.DONE;
- } else {
- return State.FAILED;
- }
+ return waitUntilFinish();
}
@Override
public State waitUntilFinish() {
- return waitUntilFinish(defaultWaitDuration);
+ app.waitUntilFinish();
+ return State.DONE;
}
@Override
@@ -109,18 +89,19 @@ public class GearpumpPipelineResult implements PipelineResult {
List<AppMasterData> apps =
JavaConverters.<AppMasterData>seqAsJavaListConverter(
(Seq<AppMasterData>) client.listApps().appMasters()).asJava();
- for (AppMasterData app: apps) {
- if (app.appId() == appId) {
- status = app.status();
+ for (AppMasterData appData: apps) {
+ if (appData.appId() == app.appId()) {
+ status = appData.status();
}
}
if (null == status || status instanceof ApplicationStatus.NONEXIST$) {
return State.UNKNOWN;
} else if (status instanceof ApplicationStatus.ACTIVE$) {
return State.RUNNING;
+ } else if (status instanceof ApplicationStatus.SUCCEEDED$) {
+ return State.DONE;
} else {
- return State.STOPPED;
+ return State.FAILED;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 20624ed..4cc060c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -20,6 +20,9 @@ package org.apache.beam.runners.gearpump;
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
+import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
@@ -38,6 +41,7 @@ import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PValue;
@@ -75,6 +79,10 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ registerTransformTranslator(View.CreatePCollectionView.class,
+ new CreatePCollectionViewTranslator());
+ registerTransformTranslator(GearpumpRunner.CreateGearpumpPCollectionView.class,
+ new CreateGearpumpPCollectionViewTranslator<>());
}
public GearpumpPipelineTranslator(TranslationContext translationContext) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9ca1eb2..72f2126 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -17,29 +17,45 @@
*/
package org.apache.beam.runners.gearpump;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.client.RunningApplication;
import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
@@ -56,8 +72,21 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
private static final String DEFAULT_APPNAME = "beam_gearpump_app";
+ /** Custom transforms implementations. */
+ private final Map<Class<?>, Class<?>> overrides;
+
public GearpumpRunner(GearpumpPipelineOptions options) {
this.options = options;
+
+ ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.builder();
+ builder.put(Combine.GloballyAsSingletonView.class,
+ StreamingCombineGloballyAsSingletonView.class);
+ builder.put(View.AsMap.class, StreamingViewAsMap.class);
+ builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+ builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+ builder.put(View.AsList.class, StreamingViewAsList.class);
+ builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+ overrides = builder.build();
}
public static GearpumpRunner fromOptions(PipelineOptions options) {
@@ -69,15 +98,23 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
- if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+ if (overrides.containsKey(transform.getClass())) {
+
+ Class<PTransform<InputT, OutputT>> transformClass =
+ (Class<PTransform<InputT, OutputT>>) transform.getClass();
+
+ Class<PTransform<InputT, OutputT>> customTransformClass =
+ (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
+
+ PTransform<InputT, OutputT> customTransform =
+ InstanceBuilder.ofType(customTransformClass)
+ .withArg(transformClass, transform)
+ .build();
+
+ return Pipeline.applyTransform(input, customTransform);
+ } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
&& ((PCollectionList<?>) input).size() == 0) {
return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
- } else if (Create.Values.class.equals(transform.getClass())) {
- return (OutputT) PCollection
- .<OutputT>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.BOUNDED);
} else {
return super.apply(transform, input);
}
@@ -99,9 +136,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
TranslationContext translationContext = new TranslationContext(streamApp, options);
GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
translator.translate(pipeline);
- int appId = streamApp.submit().appId();
+ RunningApplication app = streamApp.submit();
- return new GearpumpPipelineResult(clientContext, appId);
+ return new GearpumpPipelineResult(clientContext, app);
}
private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
@@ -131,4 +168,320 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
}
+
+
+ // The following codes are forked from DataflowRunner for View translator
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Gearpump runner.
+ */
+ private static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ private static final long serialVersionUID = 4791080760092950304L;
+
+ public StreamingViewAsMap(View.AsMap<K, V> transform) {}
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // throw new RuntimeException(e);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ private static final long serialVersionUID = 5854899081751333352L;
+
+ public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // throw new RuntimeException(e);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+
+ private static final long serialVersionUID = -3399860618995613421L;
+
+ public StreamingViewAsIterable(View.AsIterable<T> transform) {}
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+
+ private static final long serialVersionUID = -5018631473886330629L;
+
+ public StreamingViewAsList(View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+ private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+
+ private static final long serialVersionUID = 9064900748869035738L;
+ private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ public StreamingCombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined.getPipeline(),
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+
+ private static final long serialVersionUID = 5870455965625071546L;
+ private final View.AsSingleton<T> transform;
+
+ public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private static final long serialVersionUID = -2637073020800540542L;
+ private PCollectionView<ViewT> view;
+
+ private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateGearpumpPCollectionView<>(view);
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index ee31fb5..c96bcb1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -18,8 +18,9 @@
package org.apache.beam.runners.gearpump;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -27,7 +28,9 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.util.Constants;
/**
* Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}.
@@ -38,7 +41,10 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
private final EmbeddedCluster cluster;
private TestGearpumpRunner(GearpumpPipelineOptions options) {
- cluster = EmbeddedCluster.apply();
+ Config config = ClusterConfig.master(null);
+ config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
+ ConfigValueFactory.fromAnyRef(0));
+ cluster = new EmbeddedCluster(config);
cluster.start();
options.setEmbeddedCluster(cluster);
delegate = GearpumpRunner.fromOptions(options);
@@ -52,12 +58,31 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
@Override
public GearpumpPipelineResult run(Pipeline pipeline) {
- GearpumpPipelineResult result = delegate.run(pipeline);
- PipelineResult.State state = result.waitUntilFinish();
- cluster.stop();
- assert(state == PipelineResult.State.DONE);
+ try {
+ GearpumpPipelineResult result = delegate.run(pipeline);
+ result.waitUntilFinish();
+ cluster.stop();
+ return result;
+ } catch (Throwable e) {
+ // copied from TestFlinkRunner to pull out AssertionError
+ // which is wrapped in UserCodeException
+ Throwable cause = e;
+ Throwable oldCause;
+ do {
+ if (cause.getCause() == null) {
+ break;
+ }
- return result;
+ oldCause = cause;
+ cause = cause.getCause();
+
+ } while (!oldCause.equals(cause));
+ if (cause instanceof AssertionError) {
+ throw (AssertionError) cause;
+ } else {
+ throw e;
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
new file mode 100644
index 0000000..d05c89d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
+/**
+ * CreateGearpumpPCollectionView bridges input stream to down stream
+ * transforms.
+ */
+public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
+ TransformTranslator<GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+
+ @Override
+ public void translate(GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
+ JavaStream<WindowedValue<List<ElemT>>> inputStream =
+ context.getInputStream(context.getInput(transform));
+ PCollectionView<ViewT> view = transform.getView();
+ context.setOutputStream(view, inputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
new file mode 100644
index 0000000..e9e2e5d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * View.CreatePCollectionView bridges input stream to down stream
+ * transforms.
+ */
+public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
+ TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+
+ @Override
+ public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
+ TranslationContext context) {
+ JavaStream<WindowedValue<List<ElemT>>> inputStream =
+ context.getInputStream(context.getInput(transform));
+ PCollectionView<ViewT> view = transform.getView();
+ context.setOutputStream(view, inputStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
index 452127a..e5dc6dd 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
@@ -33,6 +33,8 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
*/
public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
+ private static final long serialVersionUID = 5411841848199229738L;
+
@Override
public void translate(Create.Values<T> transform, TranslationContext context) {
try {
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
index b740ab5..27e54b8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
@@ -18,11 +18,22 @@
package org.apache.beam.runners.gearpump.translators;
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
/**
* Flatten.FlattenPCollectionList is translated to Gearpump merge function.
* Note only two-way merge is working now
@@ -30,17 +41,44 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
public class FlattenPCollectionTranslator<T> implements
TransformTranslator<Flatten.FlattenPCollectionList<T>> {
+ private static final long serialVersionUID = -5552148802472944759L;
+
@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
JavaStream<T> merged = null;
+ Set<PCollection<T>> unique = new HashSet<>();
for (PCollection<T> collection : context.getInput(transform).getAll()) {
+ unique.add(collection);
JavaStream<T> inputStream = context.getInputStream(collection);
if (null == merged) {
merged = inputStream;
} else {
+ // duplicate edges are not allowed in Gearpump graph
+ // so we route through a dummy node
+ if (unique.contains(collection)) {
+ inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
+ }
+
merged = merged.merge(inputStream, transform.getName());
}
}
+
+ if (null == merged) {
+ UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ new ValuesSource<>(Lists.newArrayList("dummy"),
+ StringUtf8Coder.of()), context.getPipelineOptions());
+ merged = context.getSourceStream(unboundedSourceWrapper);
+ }
context.setOutputStream(context.getOutput(transform), merged);
}
+
+ private static class DummyFunction<T> extends MapFunction<T, T> {
+
+ private static final long serialVersionUID = 5454396869997290471L;
+
+ @Override
+ public T map(T t) {
+ return t;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 69a1d11..df8bfe9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
@@ -49,12 +49,16 @@ import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
import org.apache.gearpump.streaming.dsl.window.api.Windows;
import org.apache.gearpump.streaming.dsl.window.impl.Window;
+import org.joda.time.Instant;
/**
* {@link GroupByKey} is translated to Gearpump groupBy function.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+
+ private static final long serialVersionUID = -8742202583992787659L;
+
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
@@ -66,15 +70,14 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
input.getWindowingStrategy().getOutputTimeFn();
WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>)
input.getWindowingStrategy().getWindowFn();
- JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
+ JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
.window(Windows.apply(
new GearpumpWindowFn(windowFn.isNonMerging()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
- .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>)
input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp")
- .reduce(new Merge<>(windowFn, outputTimeFn), "merge")
+ .fold(new Merge<>(windowFn, outputTimeFn), "merge")
.map(new Values<K, V>(), "values");
context.setOutputStream(context.getOutput(transform), outputStream);
@@ -115,6 +118,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
private static class GroupByFn<K, V> extends
GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
+ private static final long serialVersionUID = -807905402490735530L;
private final Coder<K> keyCoder;
GroupByFn(Coder<K> keyCoder) {
@@ -122,7 +126,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
@Override
- public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+ public ByteBuffer groupBy(WindowedValue<KV<K, V>> wv) {
try {
return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
} catch (CoderException e) {
@@ -131,19 +135,9 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class ValueToIterable<K, V>
- extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
-
- @Override
- public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
- Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());
- return wv.withValue(KV.of(wv.getValue().getKey(), values));
- }
- }
-
private static class KeyedByTimestamp<K, V>
- extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
- KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+ extends MapFunction<WindowedValue<KV<K, V>>,
+ KV<Instant, WindowedValue<KV<K, V>>>> {
private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
@@ -152,16 +146,17 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
@Override
- public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
- WindowedValue<KV<K, Iterable<V>>> wv) {
- org.joda.time.Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
+ public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
+ WindowedValue<KV<K, V>> wv) {
+ Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
Iterables.getOnlyElement(wv.getWindows()));
return KV.of(timestamp, wv);
}
}
private static class Merge<K, V> extends
- ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+ FoldFunction<KV<Instant, WindowedValue<KV<K, V>>>,
+ KV<Instant, WindowedValue<KV<K, List<V>>>>> {
private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
@@ -173,14 +168,28 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
@Override
- public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
- KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
- KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
- org.joda.time.Instant t1 = kv1.getKey();
- org.joda.time.Instant t2 = kv2.getKey();
+ public KV<Instant, WindowedValue<KV<K, List<V>>>> init() {
+ return KV.of(null, null);
+ }
+
+ @Override
+ public KV<Instant, WindowedValue<KV<K, List<V>>>> fold(
+ KV<Instant, WindowedValue<KV<K, List<V>>>> accum,
+ KV<Instant, WindowedValue<KV<K, V>>> iter) {
+ if (accum.getKey() == null) {
+ WindowedValue<KV<K, V>> wv = iter.getValue();
+ KV<K, V> kv = wv.getValue();
+ V v = kv.getValue();
+ List<V> nv = Lists.newArrayList(v);
+ return KV.of(iter.getKey(), wv.withValue(KV.of(kv.getKey(), nv)));
+ }
+
+ Instant t1 = accum.getKey();
+ Instant t2 = iter.getKey();
- final WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
- final WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+ final WindowedValue<KV<K, List<V>>> wv1 = accum.getValue();
+ final WindowedValue<KV<K, V>> wv2 = iter.getValue();
+ wv1.getValue().getValue().add(wv2.getValue().getValue());
final List<BoundedWindow> mergedWindows = new ArrayList<>();
if (!windowFn.isNonMerging()) {
@@ -208,23 +217,22 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
mergedWindows.addAll(wv1.getWindows());
}
- org.joda.time.Instant timestamp = outputTimeFn.combine(t1, t2);
+ Instant timestamp = outputTimeFn.combine(t1, t2);
return KV.of(timestamp,
- WindowedValue.of(KV.of(wv1.getValue().getKey(),
- Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), timestamp,
+ WindowedValue.of(wv1.getValue(), timestamp,
mergedWindows, wv1.getPane()));
}
}
private static class Values<K, V> extends
- MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
- WindowedValue<KV<K, Iterable<V>>>> {
+ MapFunction<KV<Instant, WindowedValue<KV<K, List<V>>>>,
+ WindowedValue<KV<K, List<V>>>> {
@Override
- public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
- WindowedValue<KV<K, Iterable<V>>>> kv) {
- org.joda.time.Instant timestamp = kv.getKey();
- WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+ public WindowedValue<KV<K, List<V>>> map(KV<org.joda.time.Instant,
+ WindowedValue<KV<K, List<V>>>> kv) {
+ Instant timestamp = kv.getKey();
+ WindowedValue<KV<K, List<V>>> wv = kv.getValue();
return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index bf7073b..8c57019 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,158 +18,93 @@
package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import javax.annotation.Nullable;
+
+import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
- * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
+ * with {@link DoFn} wrapped in {@link DoFnFunction}. The outputs are
* further filtered with Gearpump filter function by output tag
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ParDoBoundMultiTranslator<InputT, OutputT> implements
TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
+ private static final long serialVersionUID = -6023461558200028849L;
+
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
- Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+ Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Map<String, PCollectionView<?>> tagsToSideInputs =
+ TranslatorUtils.getTagsToSideInputs(sideInputs);
- JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
- new DoFnMultiFunction<>(
- context.getPipelineOptions(),
- transform.getFn(),
- transform.getMainOutputTag(),
- transform.getSideOutputTags(),
- inputT.getWindowingStrategy(),
- new NoOpSideInputReader()
- ), transform.getName());
- for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+ Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+ final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
+ List<TupleTag<?>> sideOutputs = Lists.newLinkedList(Sets.filter(outputs.keySet(),
+ new Predicate<TupleTag<?>>() {
+ @Override
+ public boolean apply(@Nullable TupleTag<?> tupleTag) {
+ return tupleTag != null && !tupleTag.getId().equals(mainOutput.getId());
+ }
+ }));
+
+ JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream(
+ context, inputStream, tagsToSideInputs);
+
+ JavaStream<TranslatorUtils.RawUnionValue> outputStream =
+ TranslatorUtils.toList(unionStream).flatMap(
+ new DoFnFunction<>(
+ context.getPipelineOptions(),
+ transform.getFn(),
+ inputT.getWindowingStrategy(),
+ sideInputs,
+ tagsToSideInputs,
+ mainOutput,
+ sideOutputs), transform.getName());
+ for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+ output.getValue().getCoder();
JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
- .filter(new FilterByOutputTag<>((TupleTag<OutputT>) output.getKey())
- , "filter_by_output_tag")
- .map(new ExtractOutput<OutputT>(), "extract output");
-
+ .filter(new FilterByOutputTag(output.getKey().getId()),
+ "filter_by_output_tag")
+ .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
context.setOutputStream(output.getValue(), taggedStream);
}
}
- /**
- * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
- */
- private static class DoFnMultiFunction<InputT, OutputT>
- extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
- implements DoFnRunners.OutputManager {
-
- private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
- private DoFnRunner<InputT, OutputT> doFnRunner;
- private final DoFn<InputT, OutputT> doFn;
- private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
-
- public DoFnMultiFunction(
- GearpumpPipelineOptions pipelineOptions,
- DoFn<InputT, OutputT> doFn,
- TupleTag<OutputT> mainOutputTag,
- TupleTagList sideOutputTags,
- WindowingStrategy<?, ?> windowingStrategy,
- SideInputReader sideInputReader) {
- this.doFn = doFn;
- this.doFnRunnerFactory = new DoFnRunnerFactory<>(
- pipelineOptions,
- doFn,
- sideInputReader,
- this,
- mainOutputTag,
- sideOutputTags.getAll(),
- new NoOpStepContext(),
- new NoOpAggregatorFactory(),
- windowingStrategy
- );
- }
-
- @Override
- public void setup() {
- DoFnInvokers.invokerFor(doFn).invokeSetup();
- }
-
- @Override
- public void teardown() {
- DoFnInvokers.invokerFor(doFn).invokeTeardown();
- }
-
- @Override
- public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
- outputs = Lists.newArrayList();
-
- if (null == doFnRunner) {
- doFnRunner = doFnRunnerFactory.createRunner();
- }
- doFnRunner.startBundle();
- doFnRunner.processElement(wv);
- doFnRunner.finishBundle();
-
- return outputs.iterator();
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- KV<TupleTag<OutputT>, OutputT> kv = KV.of((TupleTag<OutputT>) tag,
- (OutputT) output.getValue());
- outputs.add(WindowedValue.of(kv, output.getTimestamp(),
- output.getWindows(), output.getPane()));
- }
- }
-
- private static class FilterByOutputTag<OutputT> extends
- FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
+ private static class FilterByOutputTag extends FilterFunction<TranslatorUtils.RawUnionValue> {
- private final TupleTag<OutputT> tupleTag;
+ private static final long serialVersionUID = 7276155265895637526L;
+ private final String tag;
- public FilterByOutputTag(TupleTag<OutputT> tupleTag) {
- this.tupleTag = tupleTag;
+ FilterByOutputTag(String tag) {
+ this.tag = tag;
}
@Override
- public boolean apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
- return wv.getValue().getKey().equals(tupleTag);
- }
- }
-
- private static class ExtractOutput<OutputT> extends
- MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
-
- @Override
- public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
- // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue());
- return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
- wv.getWindows(), wv.getPane());
+ public boolean filter(TranslatorUtils.RawUnionValue value) {
+ return value.getUnionTag().equals(tag);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index 689bc08..efae938 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -18,14 +18,21 @@
package org.apache.beam.runners.gearpump.translators;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -36,18 +43,33 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
public class ParDoBoundTranslator<InputT, OutputT> implements
TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+ private static final long serialVersionUID = -3413205558160983784L;
+ private final TupleTag<OutputT> mainOutput = new TupleTag<>();
+ private final List<TupleTag<?>> sideOutputs = TupleTagList.empty().getAll();
+
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getFn();
PCollection<OutputT> output = context.getOutput(transform);
WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
+ Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
+ Map<String, PCollectionView<?>> tagsToSideInputs =
+ TranslatorUtils.getTagsToSideInputs(sideInputs);
+ JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
+ context.getInput(transform));
+ JavaStream<TranslatorUtils.RawUnionValue> unionStream =
+ TranslatorUtils.withSideInputStream(context,
+ inputStream, tagsToSideInputs);
+
DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
- doFn, windowingStrategy, new NoOpSideInputReader());
- JavaStream<WindowedValue<InputT>> inputStream =
- context.getInputStream(context.getInput(transform));
+ doFn, windowingStrategy, sideInputs, tagsToSideInputs,
+ mainOutput, sideOutputs);
+
JavaStream<WindowedValue<OutputT>> outputStream =
- inputStream.flatMap(doFnFunction, transform.getName());
+ TranslatorUtils.toList(unionStream)
+ .flatMap(doFnFunction, transform.getName())
+ .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
context.setOutputStream(context.getOutput(transform), outputStream);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index c0de2df..81970e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.gearpump.translators;
import com.google.common.collect.Iterables;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -28,8 +31,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.joda.time.Instant;
/**
@@ -38,6 +41,8 @@ import org.joda.time.Instant;
@SuppressWarnings("unchecked")
public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> {
+ private static final long serialVersionUID = -964887482120489061L;
+
@Override
public void translate(Window.Bound<T> transform, TranslationContext context) {
PCollection<T> input = context.getInput(transform);
@@ -47,14 +52,15 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
JavaStream<WindowedValue<T>> outputStream =
inputStream
- .map(new AssignWindows(windowFn), "assign_windows");
+ .flatMap(new AssignWindows(windowFn), "assign_windows");
context.setOutputStream(context.getOutput(transform), outputStream);
}
private static class AssignWindows<T> extends
- MapFunction<WindowedValue<T>, WindowedValue<T>> {
+ FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+ private static final long serialVersionUID = 7284565861938681360L;
private final WindowFn<T, BoundedWindow> windowFn;
AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
@@ -62,7 +68,7 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
}
@Override
- public WindowedValue<T> apply(final WindowedValue<T> value) {
+ public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
try {
Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
@Override
@@ -80,7 +86,12 @@ public class WindowBoundTranslator<T> implements TransformTranslator<Window.Bou
return Iterables.getOnlyElement(value.getWindows());
}
});
- return WindowedValue.of(value.getValue(), value.getTimestamp(), windows, value.getPane());
+ List<WindowedValue<T>> values = new ArrayList<>(windows.size());
+ for (BoundedWindow win: windows) {
+ values.add(
+ WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
+ }
+ return values.iterator();
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index a66d3a4..b2c68d6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,90 +18,190 @@
package org.apache.beam.runners.gearpump.translators.functions;
+import com.google.common.collect.Iterables;
+
import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils.RawUnionValue;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
+
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
/**
* Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
*/
+@SuppressWarnings("unchecked")
public class DoFnFunction<InputT, OutputT> extends
- FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
- DoFnRunners.OutputManager {
+ FlatMapFunction<List<RawUnionValue>, RawUnionValue> {
- private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
- private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+ private static final long serialVersionUID = -5701440128544343353L;
private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
- private DoFnRunner<InputT, OutputT> doFnRunner;
private final DoFn<InputT, OutputT> doFn;
+ private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+ private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner;
+ private transient SideInputHandler sideInputReader;
+ private transient List<WindowedValue<InputT>> pushedBackValues;
+ private transient Map<PCollectionView<?>, List<WindowedValue<Iterable<?>>>> sideInputValues;
+ private final Collection<PCollectionView<?>> sideInputs;
+ private final Map<String, PCollectionView<?>> tagsToSideInputs;
+ private final TupleTag<OutputT> mainOutput;
+ private final List<TupleTag<?>> sideOutputs;
+ private final DoFnOutputManager outputManager;
public DoFnFunction(
GearpumpPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
- SideInputReader sideInputReader) {
+ Collection<PCollectionView<?>> sideInputs,
+ Map<String, PCollectionView<?>> sideInputTagMapping,
+ TupleTag<OutputT> mainOutput,
+ List<TupleTag<?>> sideOutputs) {
this.doFn = doFn;
+ this.outputManager = new DoFnOutputManager();
this.doFnRunnerFactory = new DoFnRunnerFactory<>(
pipelineOptions,
doFn,
- sideInputReader,
- this,
- mainTag,
- TupleTagList.empty().getAll(),
+ sideInputs,
+ outputManager,
+ mainOutput,
+ sideOutputs,
new NoOpStepContext(),
new NoOpAggregatorFactory(),
windowingStrategy
);
+ this.sideInputs = sideInputs;
+ this.tagsToSideInputs = sideInputTagMapping;
+ this.mainOutput = mainOutput;
+ this.sideOutputs = sideOutputs;
}
@Override
public void setup() {
- DoFnInvokers.invokerFor(doFn).invokeSetup();
+ sideInputReader = new SideInputHandler(sideInputs,
+ InMemoryStateInternals.<Void>forKey(null));
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+
+ doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
+
+ pushedBackValues = new LinkedList<>();
+ sideInputValues = new HashMap<>();
+ outputManager.setup(mainOutput, sideOutputs);
}
@Override
public void teardown() {
- DoFnInvokers.invokerFor(doFn).invokeTeardown();
+ doFnInvoker.invokeTeardown();
}
@Override
- public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
- outputs = Lists.newArrayList();
+ public Iterator<TranslatorUtils.RawUnionValue> flatMap(List<RawUnionValue> inputs) {
+ outputManager.clear();
+
+ doFnRunner.startBundle();
- if (null == doFnRunner) {
- doFnRunner = doFnRunnerFactory.createRunner();
+ for (RawUnionValue unionValue: inputs) {
+ final String tag = unionValue.getUnionTag();
+ if (tag.equals("0")) {
+ // main input
+ pushedBackValues.add((WindowedValue<InputT>) unionValue.getValue());
+ } else {
+ // side input
+ PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag());
+ WindowedValue<Iterable<?>> sideInputValue =
+ (WindowedValue<Iterable<?>>) unionValue.getValue();
+ if (!sideInputValues.containsKey(sideInput)) {
+ sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>());
+ }
+ sideInputValues.get(sideInput).add(sideInputValue);
+ }
}
- doFnRunner.startBundle();
- doFnRunner.processElement(value);
+ for (PCollectionView<?> sideInput: sideInputs) {
+ if (sideInputValues.containsKey(sideInput)) {
+ for (WindowedValue<Iterable<?>> value: sideInputValues.get(sideInput)) {
+ sideInputReader.addSideInputValue(sideInput, value);
+ }
+ }
+ for (WindowedValue<InputT> value : pushedBackValues) {
+ for (BoundedWindow win: value.getWindows()) {
+ BoundedWindow sideInputWindow =
+ sideInput.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(win);
+ if (!sideInputReader.isReady(sideInput, sideInputWindow)) {
+ Object emptyValue = WindowedValue.of(
+ Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());
+ sideInputReader.addSideInputValue(sideInput, (WindowedValue<Iterable<?>>) emptyValue);
+ }
+ }
+ }
+ }
+
+ List<WindowedValue<InputT>> nextPushedBackValues = new LinkedList<>();
+ for (WindowedValue<InputT> value : pushedBackValues) {
+ Iterable<WindowedValue<InputT>> values = doFnRunner.processElementInReadyWindows(value);
+ Iterables.addAll(nextPushedBackValues, values);
+ }
+ pushedBackValues.clear();
+ Iterables.addAll(pushedBackValues, nextPushedBackValues);
+ sideInputValues.clear();
+
doFnRunner.finishBundle();
- return outputs.iterator();
+ return outputManager.getOutputs();
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- if (mainTag.equals(tag)) {
- outputs.add((WindowedValue<OutputT>) output);
- } else {
- throw new RuntimeException("output is not of main tag");
+ private static class DoFnOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+ private static final long serialVersionUID = 4967375172737408160L;
+ private transient List<RawUnionValue> outputs;
+ private transient Set<TupleTag<?>> outputTags;
+
+ @Override
+ public <T> void output(TupleTag<T> outputTag, WindowedValue<T> output) {
+ if (outputTags.contains(outputTag)) {
+ outputs.add(new RawUnionValue(outputTag.getId(), output));
+ }
+ }
+
+ void setup(TupleTag<?> mainOutput, List<TupleTag<?>> sideOutputs) {
+ outputs = new LinkedList<>();
+ outputTags = new HashSet<>();
+ outputTags.add(mainOutput);
+ outputTags.addAll(sideOutputs);
+ }
+
+ void clear() {
+ outputs.clear();
+ }
+
+ Iterator<RawUnionValue> getOutputs() {
+ return outputs.iterator();
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
index f889101..2c18735 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
*/
public class BoundedSourceWrapper<T> extends GearpumpSource<T> {
+ private static final long serialVersionUID = 8199570485738786123L;
private final BoundedSource<T> source;
public BoundedSourceWrapper(BoundedSource<T> source, PipelineOptions options) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 3d0d7c8..c079603 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,10 +28,13 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+// import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.gearpump.Message;
import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
import org.apache.gearpump.streaming.task.TaskContext;
/**
@@ -74,11 +77,11 @@ public abstract class GearpumpSource<T> implements DataSource {
if (available) {
T data = reader.getCurrent();
org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
- available = reader.advance();
message = Message.apply(
- WindowedValue.valueInGlobalWindow(data),
+ WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
timestamp.getMillis());
}
+ available = reader.advance();
} catch (Exception e) {
close();
throw new RuntimeException(e);
@@ -100,11 +103,19 @@ public abstract class GearpumpSource<T> implements DataSource {
@Override
public Instant getWatermark() {
if (reader instanceof UnboundedSource.UnboundedReader) {
- return TranslatorUtils.jodaTimeToJava8Time(
- ((UnboundedSource.UnboundedReader) reader).getWatermark());
+ org.joda.time.Instant watermark =
+ ((UnboundedSource.UnboundedReader) reader).getWatermark();
+ if (watermark == BoundedWindow.TIMESTAMP_MAX_VALUE) {
+ return Watermark.MAX();
+ } else {
+ return TranslatorUtils.jodaTimeToJava8Time(watermark);
+ }
} else {
- return Instant.now();
+ if (available) {
+ return TranslatorUtils.jodaTimeToJava8Time(reader.getCurrentTimestamp());
+ } else {
+ return Watermark.MAX();
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index f5a5eb4..e0488cd 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;
/**
@@ -40,6 +41,7 @@ import org.joda.time.Instant;
*/
public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
+ private static final long serialVersionUID = 9113026175795235710L;
private final byte[] values;
private final IterableCoder<T> iterableCoder;
@@ -135,7 +137,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
- return Instant.now();
+ return getTimestamp(current);
}
@Override
@@ -145,7 +147,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
@Override
public Instant getWatermark() {
if (iterator.hasNext()) {
- return Instant.now();
+ return getTimestamp(current);
} else {
return BoundedWindow.TIMESTAMP_MAX_VALUE;
}
@@ -160,5 +162,13 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
public UnboundedSource<T, ?> getCurrentSource() {
return source;
}
+
+ private Instant getTimestamp(Object value) {
+ if (value instanceof TimestampedValue) {
+ return ((TimestampedValue) value).getTimestamp();
+ } else {
+ return Instant.now();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index aaefb88..5db8320 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -19,18 +19,21 @@
package org.apache.beam.runners.gearpump.translators.utils;
import java.io.Serializable;
+import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.AggregatorFactory;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
/**
@@ -38,10 +41,10 @@ import org.apache.beam.sdk.values.TupleTag;
*/
public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
- private static final long serialVersionUID = 1083167395296383469L;
+ private static final long serialVersionUID = -4109539010014189725L;
private final DoFn<InputT, OutputT> fn;
private final transient PipelineOptions options;
- private final SideInputReader sideInputReader;
+ private final Collection<PCollectionView<?>> sideInputs;
private final DoFnRunners.OutputManager outputManager;
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> sideOutputTags;
@@ -52,7 +55,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
public DoFnRunnerFactory(
GearpumpPipelineOptions pipelineOptions,
DoFn<InputT, OutputT> doFn,
- SideInputReader sideInputReader,
+ Collection<PCollectionView<?>> sideInputs,
DoFnRunners.OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
@@ -61,7 +64,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
this.options = pipelineOptions;
- this.sideInputReader = sideInputReader;
+ this.sideInputs = sideInputs;
this.outputManager = outputManager;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
@@ -70,9 +73,12 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
this.windowingStrategy = windowingStrategy;
}
- public DoFnRunner<InputT, OutputT> createRunner() {
- return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag,
+ public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+ ReadyCheckingSideInputReader sideInputReader) {
+ DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault(
+ options, fn, sideInputReader, outputManager, mainOutputTag,
sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+ return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
deleted file mode 100644
index d1a9198..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * no-op side input reader.
- */
-public class NoOpSideInputReader implements SideInputReader, Serializable {
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> view, BoundedWindow window) {
- return null;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return false;
- }
-
- @Override
- public boolean isEmpty() {
- return false;
- }
-}