You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:34 UTC
[09/50] [abbrv] beam git commit: [BEAM-79] Fix gearpump-runner merge
conflicts and test failure
[BEAM-79] Fix gearpump-runner merge conflicts and test failure
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eab6a64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eab6a64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eab6a64
Branch: refs/heads/master
Commit: 3eab6a647e4761725680c8bc40589dfa5569d75b
Parents: 3f91798
Author: manuzhang <ow...@gmail.com>
Authored: Tue Mar 14 08:09:46 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Mar 15 15:21:29 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 51 ++-
.../gearpump/GearpumpPipelineResult.java | 21 +-
.../gearpump/GearpumpPipelineTranslator.java | 388 ++++++++++++++++++-
.../beam/runners/gearpump/GearpumpRunner.java | 376 +-----------------
.../runners/gearpump/TestGearpumpRunner.java | 38 +-
.../gearpump/examples/StreamingWordCount.java | 98 -----
.../gearpump/examples/UnboundedTextSource.java | 139 -------
.../runners/gearpump/examples/package-info.java | 22 --
...CreateGearpumpPCollectionViewTranslator.java | 14 +-
.../CreatePCollectionViewTranslator.java | 6 +-
.../translators/CreateValuesTranslator.java | 51 ---
.../FlattenPCollectionTranslator.java | 84 ----
.../FlattenPCollectionsTranslator.java | 83 ++++
.../translators/GroupByKeyTranslator.java | 4 +-
.../translators/ParDoBoundMultiTranslator.java | 32 +-
.../translators/ParDoBoundTranslator.java | 7 +-
.../translators/ReadBoundedTranslator.java | 4 +-
.../translators/ReadUnboundedTranslator.java | 4 +-
.../translators/TransformTranslator.java | 2 +-
.../translators/TranslationContext.java | 29 +-
.../translators/WindowAssignTranslator.java | 100 +++++
.../translators/WindowBoundTranslator.java | 100 -----
.../translators/functions/DoFnFunction.java | 12 +-
.../translators/io/UnboundedSourceWrapper.java | 1 +
.../translators/utils/DoFnRunnerFactory.java | 4 +-
.../utils/NoOpAggregatorFactory.java | 2 +-
.../translators/utils/NoOpStepContext.java | 6 +-
.../translators/utils/TranslatorUtils.java | 2 -
.../translators/utils/TranslatorUtilsTest.java | 1 -
29 files changed, 703 insertions(+), 978 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 3efb1f6..9a6a432 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
- <version>0.5.0-incubating-SNAPSHOT</version>
+ <version>0.7.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -65,10 +65,12 @@
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
<excludedGroups>
+ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
- org.apache.beam.sdk.testing.UsesMetrics
+ org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+ org.apache.beam.sdk.testing.UsesCommittedMetrics
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
@@ -136,6 +138,16 @@
<artifactId>beam-runners-core-java</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
@@ -182,6 +194,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
@@ -210,8 +227,36 @@
<!-- Java compiler -->
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <testSource>1.8</testSource>
+ <testTarget>1.8</testTarget>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>enforce</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <enforceBytecodeVersion>
+ <maxJdkVersion>1.8</maxJdkVersion>
+ </enforceBytecodeVersion>
+ <requireJavaVersion>
+ <version>[1.8,)</version>
+ </requireJavaVersion>
+ </rules>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<!-- uber jar -->
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 8f90898..d833cd6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -43,6 +43,7 @@ public class GearpumpPipelineResult implements PipelineResult {
private final ClientContext client;
private final RunningApplication app;
+ private boolean finished = false;
public GearpumpPipelineResult(ClientContext client, RunningApplication app) {
this.client = client;
@@ -51,13 +52,22 @@ public class GearpumpPipelineResult implements PipelineResult {
@Override
public State getState() {
- return getGearpumpState();
+ if (!finished) {
+ return getGearpumpState();
+ } else {
+ return State.DONE;
+ }
}
@Override
public State cancel() throws IOException {
- app.shutDown();
- return State.CANCELLED;
+ if (!finished) {
+ app.shutDown();
+ finished = true;
+ return State.CANCELLED;
+ } else {
+ return State.DONE;
+ }
}
@Override
@@ -67,7 +77,10 @@ public class GearpumpPipelineResult implements PipelineResult {
@Override
public State waitUntilFinish() {
- app.waitUntilFinish();
+ if (!finished) {
+ app.waitUntilFinish();
+ finished = true;
+ }
return State.DONE;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 4cc060c..1a36343 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,13 +18,19 @@
package org.apache.beam.runners.gearpump;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
-import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
@@ -32,17 +38,29 @@ import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.WindowAssignTranslator;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.gearpump.util.Graph;
@@ -74,14 +92,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
- registerTransformTranslator(Flatten.FlattenPCollectionList.class,
- new FlattenPCollectionTranslator());
+ registerTransformTranslator(Flatten.PCollections.class,
+ new FlattenPCollectionsTranslator());
registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
- registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
- registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+ registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
registerTransformTranslator(View.CreatePCollectionView.class,
new CreatePCollectionViewTranslator());
- registerTransformTranslator(GearpumpRunner.CreateGearpumpPCollectionView.class,
+ registerTransformTranslator(CreateGearpumpPCollectionView.class,
new CreateGearpumpPCollectionViewTranslator<>());
}
@@ -90,6 +107,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
}
public void translate(Pipeline pipeline) {
+ Map<PTransformMatcher, PTransformOverrideFactory> overrides =
+ ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+ .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory(
+ StreamingCombineGloballyAsSingletonView.class))
+ .put(PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class))
+ .put(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class))
+ .put(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class))
+ .put(PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class))
+ .put(PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class))
+ .build();
+
+ for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+ overrides.entrySet()) {
+ pipeline.replace(override.getKey(), override.getValue());
+ }
pipeline.traverseTopologically(this);
}
@@ -145,5 +183,337 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
return transformTranslators.get(transformClass);
}
+ // The following codes are forked from DataflowRunner for View translator
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT extends PValue,
+ OutputT extends PValue,
+ TransformT extends PTransform<InputT, OutputT>>
+ extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+ private final Class<PTransform<InputT, OutputT>> replacement;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<InputT, OutputT>> replacement) {
+ this.replacement = replacement;
+ }
+
+ @Override
+ public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
+ return InstanceBuilder.ofType(replacement)
+ .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
+ .build();
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+ * for the Gearpump runner.
+ */
+ private static class StreamingViewAsMap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+ private static final long serialVersionUID = 4791080760092950304L;
+
+ public StreamingViewAsMap(View.AsMap<K, V> transform) {}
+
+ @Override
+ public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, V>> view =
+ PCollectionViews.mapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // throw new RuntimeException(e);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMap";
+ }
+ }
+
+ /**
+ * Specialized expansion for {@link
+ * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsMultimap<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+ private static final long serialVersionUID = 5854899081751333352L;
+
+ public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
+
+ @Override
+ public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+ PCollectionView<Map<K, Iterable<V>>> view =
+ PCollectionViews.multimapView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+ try {
+ inputCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException e) {
+ // throw new RuntimeException(e);
+ }
+
+ return input
+ .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsMultimap";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsIterable<T>
+ extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+
+ private static final long serialVersionUID = -3399860618995613421L;
+
+ public StreamingViewAsIterable(View.AsIterable<T> transform) {}
+
+ @Override
+ public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+ PCollectionView<Iterable<T>> view =
+ PCollectionViews.iterableView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsIterable";
+ }
+ }
+
+ /**
+ * Specialized implementation for
+ * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+ * Gearpump runner.
+ */
+ private static class StreamingViewAsList<T>
+ extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+
+ private static final long serialVersionUID = -5018631473886330629L;
+
+ public StreamingViewAsList(View.AsList<T> transform) {}
+
+ @Override
+ public PCollectionView<List<T>> expand(PCollection<T> input) {
+ PCollectionView<List<T>> view =
+ PCollectionViews.listView(
+ input.getPipeline(),
+ input.getWindowingStrategy(),
+ input.getCoder());
+
+ return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+ .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsList";
+ }
+ }
+ private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+ extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+
+ private static final long serialVersionUID = 9064900748869035738L;
+ private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+ public StreamingCombineGloballyAsSingletonView(
+ Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+ PCollection<OutputT> combined =
+ input.apply(Combine.globally(transform.getCombineFn())
+ .withoutDefaults()
+ .withFanout(transform.getFanout()));
+
+ PCollectionView<OutputT> view = PCollectionViews.singletonView(
+ combined.getPipeline(),
+ combined.getWindowingStrategy(),
+ transform.getInsertDefault(),
+ transform.getInsertDefault()
+ ? transform.getCombineFn().defaultValue() : null,
+ combined.getCoder());
+ return combined
+ .apply(ParDo.of(new WrapAsList<OutputT>()))
+ .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingCombineGloballyAsSingletonView";
+ }
+ }
+
+ private static class StreamingViewAsSingleton<T>
+ extends PTransform<PCollection<T>, PCollectionView<T>> {
+
+ private static final long serialVersionUID = 5870455965625071546L;
+ private final View.AsSingleton<T> transform;
+
+ public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+ this.transform = transform;
+ }
+
+ @Override
+ public PCollectionView<T> expand(PCollection<T> input) {
+ Combine.Globally<T, T> combine = Combine.globally(
+ new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+ if (!transform.hasDefaultValue()) {
+ combine = combine.withoutDefaults();
+ }
+ return input.apply(combine.asSingletonView());
+ }
+
+ @Override
+ protected String getKindString() {
+ return "StreamingViewAsSingleton";
+ }
+
+ private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+ private boolean hasDefaultValue;
+ private T defaultValue;
+
+ SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+ this.hasDefaultValue = hasDefaultValue;
+ this.defaultValue = defaultValue;
+ }
+
+ @Override
+ public T apply(T left, T right) {
+ throw new IllegalArgumentException("PCollection with more than one element "
+ + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+ + "combine the PCollection into a single value");
+ }
+
+ @Override
+ public T identity() {
+ if (hasDefaultValue) {
+ return defaultValue;
+ } else {
+ throw new IllegalArgumentException(
+ "Empty PCollection accessed as a singleton view. "
+ + "Consider setting withDefault to provide a default value");
+ }
+ }
+ }
+ }
+
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(Collections.singletonList(c.element()));
+ }
+ }
+ /**
+ * Creates a primitive {@link PCollectionView}.
+ *
+ * <p>For internal use only by runner implementors.
+ *
+ * @param <ElemT> The type of the elements of the input PCollection
+ * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+ */
+ public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+ extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+ private static final long serialVersionUID = -2637073020800540542L;
+ private PCollectionView<ViewT> view;
+
+ private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+ this.view = view;
+ }
+
+ public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+ PCollectionView<ViewT> view) {
+ return new CreateGearpumpPCollectionView<>(view);
+ }
+
+ public PCollectionView<ViewT> getView() {
+ return view;
+ }
+
+ @Override
+ public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+ return view;
+ }
+ }
+
+ /**
+ * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+ *
+ * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+ * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+ * They require the input {@link PCollection} fits in memory.
+ * For a large {@link PCollection} this is expected to crash!
+ *
+ * @param <T> the type of elements to concatenate.
+ */
+ private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ accumulator.add(input);
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> result = createAccumulator();
+ for (List<T> accumulator : accumulators) {
+ result.addAll(accumulator);
+ }
+ return result;
+ }
+
+ @Override
+ public List<T> extractOutput(List<T> accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+
+ @Override
+ public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+ return ListCoder.of(inputCoder);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 72f2126..897467a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -17,40 +17,18 @@
*/
package org.apache.beam.runners.gearpump;
-import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValueFactory;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
+
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.UserConfig;
@@ -72,21 +50,8 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
private static final String DEFAULT_APPNAME = "beam_gearpump_app";
- /** Custom transforms implementations. */
- private final Map<Class<?>, Class<?>> overrides;
-
public GearpumpRunner(GearpumpPipelineOptions options) {
this.options = options;
-
- ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.builder();
- builder.put(Combine.GloballyAsSingletonView.class,
- StreamingCombineGloballyAsSingletonView.class);
- builder.put(View.AsMap.class, StreamingViewAsMap.class);
- builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
- builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
- builder.put(View.AsList.class, StreamingViewAsList.class);
- builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
- overrides = builder.build();
}
public static GearpumpRunner fromOptions(PipelineOptions options) {
@@ -95,31 +60,6 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
return new GearpumpRunner(pipelineOptions);
}
-
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (overrides.containsKey(transform.getClass())) {
-
- Class<PTransform<InputT, OutputT>> transformClass =
- (Class<PTransform<InputT, OutputT>>) transform.getClass();
-
- Class<PTransform<InputT, OutputT>> customTransformClass =
- (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
-
- PTransform<InputT, OutputT> customTransform =
- InstanceBuilder.ofType(customTransformClass)
- .withArg(transformClass, transform)
- .build();
-
- return Pipeline.applyTransform(input, customTransform);
- } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
- && ((PCollectionList<?>) input).size() == 0) {
- return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
- } else {
- return super.apply(transform, input);
- }
- }
-
@Override
public GearpumpPipelineResult run(Pipeline pipeline) {
String appName = options.getApplicationName();
@@ -170,318 +110,4 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
- // The following codes are forked from DataflowRunner for View translator
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
- * for the Gearpump runner.
- */
- private static class StreamingViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- private static final long serialVersionUID = 4791080760092950304L;
-
- public StreamingViewAsMap(View.AsMap<K, V> transform) {}
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, V>> view =
- PCollectionViews.mapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // throw new RuntimeException(e);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMap";
- }
- }
-
- /**
- * Specialized expansion for {@link
- * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
- private static final long serialVersionUID = 5854899081751333352L;
-
- public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
- PCollectionView<Map<K, Iterable<V>>> view =
- PCollectionViews.multimapView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- inputCoder.getKeyCoder().verifyDeterministic();
- } catch (Coder.NonDeterministicException e) {
- // throw new RuntimeException(e);
- }
-
- return input
- .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsMultimap";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-
- private static final long serialVersionUID = -3399860618995613421L;
-
- public StreamingViewAsIterable(View.AsIterable<T> transform) {}
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view =
- PCollectionViews.iterableView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsIterable";
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
- * Gearpump runner.
- */
- private static class StreamingViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-
- private static final long serialVersionUID = -5018631473886330629L;
-
- public StreamingViewAsList(View.AsList<T> transform) {}
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view =
- PCollectionViews.listView(
- input.getPipeline(),
- input.getWindowingStrategy(),
- input.getCoder());
-
- return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
- .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsList";
- }
- }
- private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
- extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-
- private static final long serialVersionUID = 9064900748869035738L;
- private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
- public StreamingCombineGloballyAsSingletonView(
- Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<OutputT> expand(PCollection<InputT> input) {
- PCollection<OutputT> combined =
- input.apply(Combine.globally(transform.getCombineFn())
- .withoutDefaults()
- .withFanout(transform.getFanout()));
-
- PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined.getPipeline(),
- combined.getWindowingStrategy(),
- transform.getInsertDefault(),
- transform.getInsertDefault()
- ? transform.getCombineFn().defaultValue() : null,
- combined.getCoder());
- return combined
- .apply(ParDo.of(new WrapAsList<OutputT>()))
- .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "StreamingCombineGloballyAsSingletonView";
- }
- }
-
- private static class StreamingViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
-
- private static final long serialVersionUID = 5870455965625071546L;
- private final View.AsSingleton<T> transform;
-
- public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- Combine.Globally<T, T> combine = Combine.globally(
- new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
- if (!transform.hasDefaultValue()) {
- combine = combine.withoutDefaults();
- }
- return input.apply(combine.asSingletonView());
- }
-
- @Override
- protected String getKindString() {
- return "StreamingViewAsSingleton";
- }
-
- private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
- private boolean hasDefaultValue;
- private T defaultValue;
-
- SingletonCombine(boolean hasDefaultValue, T defaultValue) {
- this.hasDefaultValue = hasDefaultValue;
- this.defaultValue = defaultValue;
- }
-
- @Override
- public T apply(T left, T right) {
- throw new IllegalArgumentException("PCollection with more than one element "
- + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
- + "combine the PCollection into a single value");
- }
-
- @Override
- public T identity() {
- if (hasDefaultValue) {
- return defaultValue;
- } else {
- throw new IllegalArgumentException(
- "Empty PCollection accessed as a singleton view. "
- + "Consider setting withDefault to provide a default value");
- }
- }
- }
- }
-
- private static class WrapAsList<T> extends DoFn<T, List<T>> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(Collections.singletonList(c.element()));
- }
- }
-
- /**
- * Creates a primitive {@link PCollectionView}.
- *
- * <p>For internal use only by runner implementors.
- *
- * @param <ElemT> The type of the elements of the input PCollection
- * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
- */
- public static class CreateGearpumpPCollectionView<ElemT, ViewT>
- extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
- private static final long serialVersionUID = -2637073020800540542L;
- private PCollectionView<ViewT> view;
-
- private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
- PCollectionView<ViewT> view) {
- return new CreateGearpumpPCollectionView<>(view);
- }
-
- public PCollectionView<ViewT> getView() {
- return view;
- }
-
- @Override
- public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
- return view;
- }
- }
-
- /**
- * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
- *
- * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
- * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
- * They require the input {@link PCollection} fits in memory.
- * For a large {@link PCollection} this is expected to crash!
- *
- * @param <T> the type of elements to concatenate.
- */
- private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
- @Override
- public List<T> createAccumulator() {
- return new ArrayList<>();
- }
-
- @Override
- public List<T> addInput(List<T> accumulator, T input) {
- accumulator.add(input);
- return accumulator;
- }
-
- @Override
- public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
- List<T> result = createAccumulator();
- for (List<T> accumulator : accumulators) {
- result.addAll(accumulator);
- }
- return result;
- }
-
- @Override
- public List<T> extractOutput(List<T> accumulator) {
- return accumulator;
- }
-
- @Override
- public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
-
- @Override
- public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
- return ListCoder.of(inputCoder);
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index c96bcb1..ea7dd26 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -24,9 +24,6 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.gearpump.cluster.ClusterConfig;
import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
@@ -58,36 +55,9 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
@Override
public GearpumpPipelineResult run(Pipeline pipeline) {
- try {
- GearpumpPipelineResult result = delegate.run(pipeline);
- result.waitUntilFinish();
- cluster.stop();
- return result;
- } catch (Throwable e) {
- // copied from TestFlinkRunner to pull out AssertionError
- // which is wrapped in UserCodeException
- Throwable cause = e;
- Throwable oldCause;
- do {
- if (cause.getCause() == null) {
- break;
- }
-
- oldCause = cause;
- cause = cause.getCause();
-
- } while (!oldCause.equals(cause));
- if (cause instanceof AssertionError) {
- throw (AssertionError) cause;
- } else {
- throw e;
- }
- }
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput>
- OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
- return delegate.apply(transform, input);
+ GearpumpPipelineResult result = delegate.run(pipeline);
+ result.waitUntilFinish();
+ cluster.stop();
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
deleted file mode 100644
index b2d762a..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.examples;
-
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.GearpumpRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * streaming word count example on Gearpump runner.
- */
-public class StreamingWordCount {
-
- static class ExtractWordsFn extends DoFn<String, String> {
-
- @ProcessElement
- public void process(ProcessContext c) {
- // Split the line into words.
- String[] words = c.element().split("[^a-zA-Z']+");
-
- // Output each word encountered into the output PCollection.
- for (String word : words) {
- if (!word.isEmpty()) {
- c.output(word);
- }
- }
- }
- }
-
- static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
- private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
-
- @ProcessElement
- public void process(ProcessContext c) {
- String row = c.element().getKey()
- + " - " + c.element().getValue()
- + " @ " + c.timestamp().toString();
- LOG.debug("output {}", row);
- c.output(row);
- }
- }
-
-
- public static void main(String[] args) {
- GearpumpPipelineOptions options = PipelineOptionsFactory
- .fromArgs(args).as(GearpumpPipelineOptions.class);
- options.setRunner(GearpumpRunner.class);
- options.setApplicationName("StreamingWordCount");
- options.setParallelism(1);
-
- Pipeline p = Pipeline.create(options);
-
- PCollection<KV<String, Long>> wordCounts =
- p.apply(Read.from(new UnboundedTextSource()))
- .apply(ParDo.of(new ExtractWordsFn()))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
- .apply(Count.<String>perElement());
-
- wordCounts.apply(ParDo.of(new FormatAsStringFn()));
-
- p.run();
-
- ClientContext clientContext = options.getClientContext();
- clientContext.close();
-
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
deleted file mode 100644
index b014432..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.examples;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-
-/**
- * unbounded source that reads from text.
- */
-public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
-
- @Override
- public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
- int desiredNumSplits, PipelineOptions options) throws Exception {
- return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
- }
-
- @Override
- public UnboundedReader<String> createReader(PipelineOptions options,
- @Nullable CheckpointMark checkpointMark) {
- return new UnboundedTextReader(this);
- }
-
- @Nullable
- @Override
- public Coder<CheckpointMark> getCheckpointMarkCoder() {
- return null;
- }
-
- @Override
- public void validate() {
- }
-
- @Override
- public Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
-
- /**
- * reads from text.
- */
- public static class UnboundedTextReader extends UnboundedReader<String> implements Serializable {
-
- private static final long serialVersionUID = 7526472295622776147L;
-
- private final UnboundedTextSource source;
-
- private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"};
- private long index = 0;
-
- private String currentRecord;
-
- private Instant currentTimestamp;
-
- public UnboundedTextReader(UnboundedTextSource source) {
- this.source = source;
- }
-
- @Override
- public boolean start() throws IOException {
- currentRecord = texts[0];
- currentTimestamp = new Instant(0);
- return true;
- }
-
- @Override
- public boolean advance() throws IOException {
- index++;
- currentRecord = texts[(int) index % (texts.length)];
- currentTimestamp = new Instant(index * 1000);
-
- return true;
- }
-
- @Override
- public byte[] getCurrentRecordId() throws NoSuchElementException {
- return new byte[0];
- }
-
- @Override
- public String getCurrent() throws NoSuchElementException {
- return this.currentRecord;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return currentTimestamp;
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public Instant getWatermark() {
- return currentTimestamp;
- }
-
- @Override
- public CheckpointMark getCheckpointMark() {
- return null;
- }
-
- @Override
- public UnboundedSource<String, ?> getCurrentSource() {
- return this.source;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
deleted file mode 100644
index a62a6c0..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Examples showcase Beam application over Gearpump runner.
- */
-package org.apache.beam.runners.gearpump.examples;
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index d05c89d..c7f24a8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -20,25 +20,27 @@ package org.apache.beam.runners.gearpump.translators;
import java.util.List;
-import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
/**
* CreateGearpumpPCollectionView bridges input stream to down stream
* transforms.
*/
public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
- TransformTranslator<GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+ TransformTranslator<GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+
+ private static final long serialVersionUID = -3955521308055056034L;
@Override
- public void translate(GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+ public void translate(
+ GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
TranslationContext context) {
JavaStream<WindowedValue<List<ElemT>>> inputStream =
- context.getInputStream(context.getInput(transform));
- PCollectionView<ViewT> view = transform.getView();
+ context.getInputStream(context.getInput());
+ PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
context.setOutputStream(view, inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
index e9e2e5d..da55d70 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
@@ -32,12 +32,14 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+ private static final long serialVersionUID = -2394386873317515748L;
+
@Override
public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
TranslationContext context) {
JavaStream<WindowedValue<List<ElemT>>> inputStream =
- context.getInputStream(context.getInput(transform));
- PCollectionView<ViewT> view = transform.getView();
+ context.getInputStream(context.getInput());
+ PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
context.setOutputStream(view, inputStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
deleted file mode 100644
index e5dc6dd..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
-import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for test
- */
-public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
-
- private static final long serialVersionUID = 5411841848199229738L;
-
- @Override
- public void translate(Create.Values<T> transform, TranslationContext context) {
- try {
- UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
- new ValuesSource<>(transform.getElements(),
- transform.getDefaultOutputCoder(context.getInput(transform))),
- context.getPipelineOptions());
- JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
- context.setOutputStream(context.getOutput(transform), sourceStream);
- } catch (CannotProvideCoderException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
deleted file mode 100644
index 27e54b8..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import com.google.common.collect.Lists;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
-import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-
-
-/**
- * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
- * Note only two-way merge is working now
- */
-public class FlattenPCollectionTranslator<T> implements
- TransformTranslator<Flatten.FlattenPCollectionList<T>> {
-
- private static final long serialVersionUID = -5552148802472944759L;
-
- @Override
- public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
- JavaStream<T> merged = null;
- Set<PCollection<T>> unique = new HashSet<>();
- for (PCollection<T> collection : context.getInput(transform).getAll()) {
- unique.add(collection);
- JavaStream<T> inputStream = context.getInputStream(collection);
- if (null == merged) {
- merged = inputStream;
- } else {
- // duplicate edges are not allowed in Gearpump graph
- // so we route through a dummy node
- if (unique.contains(collection)) {
- inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
- }
-
- merged = merged.merge(inputStream, transform.getName());
- }
- }
-
- if (null == merged) {
- UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
- new ValuesSource<>(Lists.newArrayList("dummy"),
- StringUtf8Coder.of()), context.getPipelineOptions());
- merged = context.getSourceStream(unboundedSourceWrapper);
- }
- context.setOutputStream(context.getOutput(transform), merged);
- }
-
- private static class DummyFunction<T> extends MapFunction<T, T> {
-
- private static final long serialVersionUID = 5454396869997290471L;
-
- @Override
- public T map(T t) {
- return t;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
new file mode 100644
index 0000000..3a465cb
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
+ */
+public class FlattenPCollectionsTranslator<T> implements
+ TransformTranslator<Flatten.PCollections<T>> {
+
+ private static final long serialVersionUID = -5552148802472944759L;
+
+ @Override
+ public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
+ JavaStream<T> merged = null;
+ Set<PCollection<T>> unique = new HashSet<>();
+ for (TaggedPValue input: context.getInputs()) {
+ PCollection<T> collection = (PCollection<T>) input.getValue();
+ unique.add(collection);
+ JavaStream<T> inputStream = context.getInputStream(collection);
+ if (null == merged) {
+ merged = inputStream;
+ } else {
+ // duplicate edges are not allowed in Gearpump graph
+ // so we route through a dummy node
+ if (unique.contains(collection)) {
+ inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
+ }
+
+ merged = merged.merge(inputStream, transform.getName());
+ }
+ }
+
+ if (null == merged) {
+ UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+ new ValuesSource<>(Lists.newArrayList("dummy"),
+ StringUtf8Coder.of()), context.getPipelineOptions());
+ merged = context.getSourceStream(unboundedSourceWrapper);
+ }
+ context.setOutputStream(context.getOutput(), merged);
+ }
+
+ private static class DummyFunction<T> extends MapFunction<T, T> {
+
+ private static final long serialVersionUID = 5454396869997290471L;
+
+ @Override
+ public T map(T t) {
+ return t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index df8bfe9..5dfd3e9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -61,7 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
- PCollection<KV<K, V>> input = context.getInput(transform);
+ PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
@@ -80,7 +80,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
.fold(new Merge<>(windowFn, outputTimeFn), "merge")
.map(new Values<K, V>(), "values");
- context.setOutputStream(context.getOutput(transform), outputStream);
+ context.setOutputStream(context.getOutput(), outputStream);
}
private static class GearpumpWindowFn<T, W extends BoundedWindow>
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 8c57019..e88cb73 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,17 +18,11 @@
package org.apache.beam.runners.gearpump.translators;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.DoFn;
@@ -36,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
@@ -54,21 +49,21 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
+ PCollection<InputT> inputT = (PCollection<InputT>) context.getInput();
JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
Map<String, PCollectionView<?>> tagsToSideInputs =
TranslatorUtils.getTagsToSideInputs(sideInputs);
- Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+ List<TaggedPValue> outputs = context.getOutputs();
final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
- List<TupleTag<?>> sideOutputs = Lists.newLinkedList(Sets.filter(outputs.keySet(),
- new Predicate<TupleTag<?>>() {
- @Override
- public boolean apply(@Nullable TupleTag<?> tupleTag) {
- return tupleTag != null && !tupleTag.getId().equals(mainOutput.getId());
- }
- }));
+ List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1);
+ for (TaggedPValue output: outputs) {
+ TupleTag<?> tag = output.getTag();
+ if (tag != null && !tag.getId().equals(mainOutput.getId())) {
+ sideOutputs.add(tag);
+ }
+ }
JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream(
context, inputStream, tagsToSideInputs);
@@ -83,10 +78,9 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
tagsToSideInputs,
mainOutput,
sideOutputs), transform.getName());
- for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
- output.getValue().getCoder();
+ for (TaggedPValue output: outputs) {
JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
- .filter(new FilterByOutputTag(output.getKey().getId()),
+ .filter(new FilterByOutputTag(output.getTag().getId()),
"filter_by_output_tag")
.map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
context.setOutputStream(output.getValue(), taggedStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index efae938..dc32b8c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
/**
* {@link ParDo.Bound} is translated to Gearpump flatMap function
* with {@link DoFn} wrapped in {@link DoFnFunction}.
@@ -50,14 +49,14 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getFn();
- PCollection<OutputT> output = context.getOutput(transform);
+ PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
Map<String, PCollectionView<?>> tagsToSideInputs =
TranslatorUtils.getTagsToSideInputs(sideInputs);
JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
- context.getInput(transform));
+ context.getInput());
JavaStream<TranslatorUtils.RawUnionValue> unionStream =
TranslatorUtils.withSideInputStream(context,
inputStream, tagsToSideInputs);
@@ -71,6 +70,6 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
.flatMap(doFnFunction, transform.getName())
.map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
- context.setOutputStream(context.getOutput(transform), outputStream);
+ context.setOutputStream(context.getOutput(), outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
index 478d58f..8f71a8e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -31,6 +31,8 @@ import org.apache.gearpump.streaming.source.DataSource;
*/
public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bounded<T>> {
+ private static final long serialVersionUID = -3899020490896998330L;
+
@Override
public void translate(Read.Bounded<T> transform, TranslationContext context) {
BoundedSource<T> boundedSource = transform.getSource();
@@ -38,7 +40,7 @@ public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bound
context.getPipelineOptions());
JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(sourceWrapper);
- context.setOutputStream(context.getOutput(transform), sourceStream);
+ context.setOutputStream(context.getOutput(), sourceStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
index 7e12a9c..0462c57 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -33,6 +33,8 @@ import org.apache.gearpump.streaming.source.DataSource;
public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
+ private static final long serialVersionUID = 3529494817859948619L;
+
@Override
public void translate(Read.Unbounded<T> transform, TranslationContext context) {
UnboundedSource<T, ?> unboundedSource = transform.getSource();
@@ -40,7 +42,7 @@ public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbo
unboundedSource, context.getPipelineOptions());
JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
- context.setOutputStream(context.getOutput(transform), sourceStream);
+ context.setOutputStream(context.getOutput(), sourceStream);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
index c8587d3..c7becad 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
import org.apache.beam.sdk.transforms.PTransform;
/**
- * translates {@link PTransform} to Gearpump functions.
+ * Translates {@link PTransform} to Gearpump functions.
*/
public interface TransformTranslator<T extends PTransform> extends Serializable {
void translate(T transform, TranslationContext context);
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b2cff8a..e88bb74 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,17 +20,18 @@ package org.apache.beam.runners.gearpump.translators;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Iterables;
+
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
@@ -70,18 +71,26 @@ public class TranslationContext {
}
}
- public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
- return (InputT) getCurrentTransform(transform).getInput();
+ public List<TaggedPValue> getInputs() {
+ return getCurrentTransform().getInputs();
+ }
+
+ public PValue getInput() {
+ return Iterables.getOnlyElement(getInputs()).getValue();
+ }
+
+ public List<TaggedPValue> getOutputs() {
+ return getCurrentTransform().getOutputs();
}
- public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
- return (OutputT) getCurrentTransform(transform).getOutput();
+ public PValue getOutput() {
+ return Iterables.getOnlyElement(getOutputs()).getValue();
}
- private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
+ private AppliedPTransform<?, ?, ?> getCurrentTransform() {
checkArgument(
- currentTransform != null && currentTransform.getTransform() == transform,
- "can only be called with current transform");
+ currentTransform != null,
+ "current transform not set");
return currentTransform;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
new file mode 100644
index 0000000..fe6015a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.joda.time.Instant;
+
+/**
+ * {@link Window.Bound} is translated to Gearpump flatMap function.
+ */
+@SuppressWarnings("unchecked")
+public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
+
+ private static final long serialVersionUID = -964887482120489061L;
+
+ @Override
+ public void translate(Window.Assign<T> transform, TranslationContext context) {
+ PCollection<T> input = (PCollection<T>) context.getInput();
+ PCollection<T> output = (PCollection<T>) context.getOutput();
+ JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
+ WindowingStrategy<?, ?> outputStrategy = output.getWindowingStrategy();
+ WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+ JavaStream<WindowedValue<T>> outputStream =
+ inputStream
+ .flatMap(new AssignWindows(windowFn), "assign_windows");
+
+ context.setOutputStream(output, outputStream);
+ }
+
+ private static class AssignWindows<T> extends
+ FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+ private static final long serialVersionUID = 7284565861938681360L;
+ private final WindowFn<T, BoundedWindow> windowFn;
+
+ AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
+ this.windowFn = windowFn;
+ }
+
+ @Override
+ public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
+ try {
+ Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
+ @Override
+ public T element() {
+ return value.getValue();
+ }
+
+ @Override
+ public Instant timestamp() {
+ return value.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return Iterables.getOnlyElement(value.getWindows());
+ }
+ });
+ List<WindowedValue<T>> values = new ArrayList<>(windows.size());
+ for (BoundedWindow win: windows) {
+ values.add(
+ WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
+ }
+ return values.iterator();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}