You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:41 UTC
[16/50] [abbrv] beam git commit: Update gearpump-runner against
master changes.
Update gearpump-runner against master changes.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44d21ac6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44d21ac6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44d21ac6
Branch: refs/heads/master
Commit: 44d21ac662e263c09caf2dd3b93b1c325bdfea15
Parents: 46c41fc
Author: manuzhang <ow...@gmail.com>
Authored: Thu Apr 20 20:59:47 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Apr 21 23:04:18 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 11 +--
.../gearpump/GearpumpPipelineTranslator.java | 92 +++++++++++---------
.../FlattenPCollectionsTranslator.java | 6 +-
.../translators/ParDoMultiOutputTranslator.java | 11 ++-
.../translators/TranslationContext.java | 11 ++-
.../translators/WindowAssignTranslator.java | 2 +-
.../translators/functions/DoFnFunction.java | 2 +-
.../gearpump/translators/io/GearpumpSource.java | 3 +-
.../gearpump/translators/io/ValuesSource.java | 3 +-
.../translators/utils/DoFnRunnerFactory.java | 3 +-
.../translators/utils/NoOpStepContext.java | 3 +-
.../FlattenPCollectionsTranslatorTest.java | 48 ++++++----
.../translators/GroupByKeyTranslatorTest.java | 3 +-
sdks/java/pom.xml | 2 +-
14 files changed, 113 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index a691801..dcfa390 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,13 +43,13 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <gearpump.version>0.8.3-SNAPSHOT</gearpump.version>
+ <gearpump.version>0.8.3</gearpump.version>
</properties>
<profiles>
<profile>
- <id>local-runnable-on-service-tests</id>
- <activation><activeByDefault>true</activeByDefault></activation>
+ <id>local-validates-runner-tests</id>
+ <activation><activeByDefault>false</activeByDefault></activation>
<build>
<plugins>
<plugin>
@@ -63,14 +63,15 @@
<goal>test</goal>
</goals>
<configuration>
- <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesAttemptedMetrics,
- org.apache.beam.sdk.testing.UsesCommittedMetrics
+ org.apache.beam.sdk.testing.UsesCommittedMetrics,
+ org.apache.beam.sdk.testing.UsesTestStream
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 1a36343..f5f5e70 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.gearpump;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
@@ -27,13 +27,14 @@ import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator;
import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
@@ -45,9 +46,9 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.PTransformMatcher;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
@@ -72,7 +73,7 @@ import org.slf4j.LoggerFactory;
* into Gearpump {@link Graph}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
private static final Logger LOG = LoggerFactory.getLogger(
GearpumpPipelineTranslator.class);
@@ -88,13 +89,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
static {
// register TransformTranslators
- registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+ registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator());
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
registerTransformTranslator(Flatten.PCollections.class,
new FlattenPCollectionsTranslator());
- registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+ registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
registerTransformTranslator(View.CreatePCollectionView.class,
new CreatePCollectionViewTranslator());
@@ -107,27 +108,30 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
}
public void translate(Pipeline pipeline) {
- Map<PTransformMatcher, PTransformOverrideFactory> overrides =
- ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
- .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ List<PTransformOverride> overrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)))
+ .add(PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
new ReflectiveOneToOneOverrideFactory(
- StreamingCombineGloballyAsSingletonView.class))
- .put(PTransformMatchers.classEqualTo(View.AsMap.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class))
- .put(PTransformMatchers.classEqualTo(View.AsMultimap.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class))
- .put(PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class))
- .put(PTransformMatchers.classEqualTo(View.AsList.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class))
- .put(PTransformMatchers.classEqualTo(View.AsIterable.class),
- new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class))
+ StreamingCombineGloballyAsSingletonView.class)))
.build();
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- overrides.entrySet()) {
- pipeline.replace(override.getKey(), override.getValue());
- }
+ pipeline.replaceAll(overrides);
pipeline.traverseTopologically(this);
}
@@ -185,22 +189,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
// The following codes are forked from DataflowRunner for View translator
private static class ReflectiveOneToOneOverrideFactory<
- InputT extends PValue,
- OutputT extends PValue,
- TransformT extends PTransform<InputT, OutputT>>
- extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
- private final Class<PTransform<InputT, OutputT>> replacement;
+ InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+ extends SingleInputOutputOverrideFactory<
+ PCollection<InputT>, PCollection<OutputT>, TransformT> {
+ private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
private ReflectiveOneToOneOverrideFactory(
- Class<PTransform<InputT, OutputT>> replacement) {
+ Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement) {
this.replacement = replacement;
}
@Override
- public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
- return InstanceBuilder.ofType(replacement)
- .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
- .build();
+ public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+ AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+ return PTransformReplacement.of(
+ PTransformReplacements.getSingletonMainInput(transform),
+ InstanceBuilder.ofType(replacement)
+ .withArg(
+ (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+ transform.getTransform().getClass(),
+ transform.getTransform())
+ .build());
}
}
@@ -220,7 +229,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, V>> view =
PCollectionViews.mapView(
- input.getPipeline(),
+ input,
input.getWindowingStrategy(),
input.getCoder());
@@ -259,7 +268,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
- input.getPipeline(),
+ input,
input.getWindowingStrategy(),
input.getCoder());
@@ -298,7 +307,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
PCollectionView<Iterable<T>> view =
PCollectionViews.iterableView(
- input.getPipeline(),
+ input,
input.getWindowingStrategy(),
input.getCoder());
@@ -328,7 +337,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
public PCollectionView<List<T>> expand(PCollection<T> input) {
PCollectionView<List<T>> view =
PCollectionViews.listView(
- input.getPipeline(),
+ input,
input.getWindowingStrategy(),
input.getCoder());
@@ -341,6 +350,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
return "StreamingViewAsList";
}
}
+
private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
@@ -360,7 +370,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
.withFanout(transform.getFanout()));
PCollectionView<OutputT> view = PCollectionViews.singletonView(
- combined.getPipeline(),
+ combined,
combined.getWindowingStrategy(),
transform.getInsertDefault(),
transform.getInsertDefault()
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 56f7d1a..5ca05d8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -45,8 +45,8 @@ public class FlattenPCollectionsTranslator<T> implements
public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
JavaStream<T> merged = null;
Set<PCollection<T>> unique = new HashSet<>();
- for (TaggedPValue input: context.getInputs()) {
- PCollection<T> collection = (PCollection<T>) input.getValue();
+ for (PValue input: context.getInputs().values()) {
+ PCollection<T> collection = (PCollection<T>) input;
JavaStream<T> inputStream = context.getInputStream(collection);
if (null == merged) {
merged = inputStream;
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
index e78568d..d92979b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
@@ -55,11 +55,10 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements
Map<String, PCollectionView<?>> tagsToSideInputs =
TranslatorUtils.getTagsToSideInputs(sideInputs);
- List<TaggedPValue> outputs = context.getOutputs();
+ Map<TupleTag<?>, PValue> outputs = context.getOutputs();
final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1);
- for (TaggedPValue output: outputs) {
- TupleTag<?> tag = output.getTag();
+ for (TupleTag<?> tag: outputs.keySet()) {
if (tag != null && !tag.getId().equals(mainOutput.getId())) {
sideOutputs.add(tag);
}
@@ -78,9 +77,9 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements
tagsToSideInputs,
mainOutput,
sideOutputs), transform.getName());
- for (TaggedPValue output: outputs) {
+ for (Map.Entry<TupleTag<?>, PValue> output: outputs.entrySet()) {
JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
- .filter(new FilterByOutputTag(output.getTag().getId()),
+ .filter(new FilterByOutputTag(output.getKey().getId()),
"filter_by_output_tag")
.map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
context.setOutputStream(output.getValue(), taggedStream);
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index e88bb74..eb6bc18 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Iterables;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.gearpump.cluster.UserConfig;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
@@ -71,20 +70,20 @@ public class TranslationContext {
}
}
- public List<TaggedPValue> getInputs() {
+ public Map<TupleTag<?>, PValue> getInputs() {
return getCurrentTransform().getInputs();
}
public PValue getInput() {
- return Iterables.getOnlyElement(getInputs()).getValue();
+ return Iterables.getOnlyElement(getInputs().values());
}
- public List<TaggedPValue> getOutputs() {
+ public Map<TupleTag<?>, PValue> getOutputs() {
return getCurrentTransform().getOutputs();
}
public PValue getOutput() {
- return Iterables.getOnlyElement(getOutputs()).getValue();
+ return Iterables.getOnlyElement(getOutputs().values());
}
private AppliedPTransform<?, ?, ?> getCurrentTransform() {
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 2d70b63..149f80c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
import org.joda.time.Instant;
/**
- * {@link Window.Bound} is translated to Gearpump flatMap function.
+ * {@link Window.Assign} is translated to Gearpump flatMap function.
*/
@SuppressWarnings("unchecked")
public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 9941e71..3473f53 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -156,7 +156,7 @@ public class DoFnFunction<InputT, OutputT> extends
for (WindowedValue<InputT> value : pushedBackValues) {
for (BoundedWindow win: value.getWindows()) {
BoundedWindow sideInputWindow =
- sideInput.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(win);
+ sideInput.getWindowMappingFn().getSideInputWindow(win);
if (!sideInputReader.isReady(sideInput, sideInputWindow)) {
Object emptyValue = WindowedValue.of(
Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index c079603..5e79151 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -46,6 +46,7 @@ public abstract class GearpumpSource<T> implements DataSource {
private Source.Reader<T> reader;
private boolean available = false;
+ private long count = 0L;
GearpumpSource(PipelineOptions options) {
try {
@@ -112,7 +113,7 @@ public abstract class GearpumpSource<T> implements DataSource {
}
} else {
if (available) {
- return TranslatorUtils.jodaTimeToJava8Time(reader.getCurrentTimestamp());
+ return Watermark.MIN();
} else {
return Watermark.MAX();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index ccd5cdf..b62da19 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
@@ -68,7 +69,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
}
@Override
- public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+ public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split(
int desiredNumSplits, PipelineOptions options) throws Exception {
return Collections.singletonList(this);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index bdfc336..70b4271 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -78,7 +79,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
options, fn, sideInputReader, outputManager, mainOutputTag,
sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
- return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+ return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 140df2a..4e0a74c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -49,7 +49,8 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
}
@Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+ public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
+
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index fa89d4a..ac12fa4 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -26,13 +26,15 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.google.common.collect.Lists;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.source.DataSource;
@@ -55,10 +57,10 @@ public class FlattenPCollectionsTranslatorTest {
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testTranslateWithEmptyCollection() {
- PValue mockOutput = mock(PValue.class);
+ PCollection mockOutput = mock(PCollection.class);
TranslationContext translationContext = mock(TranslationContext.class);
- when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST);
+ when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP);
when(translationContext.getOutput()).thenReturn(mockOutput);
translator.translate(transform, translationContext);
@@ -71,11 +73,12 @@ public class FlattenPCollectionsTranslatorTest {
JavaStream javaStream = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
- TaggedPValue mockInput = mock(TaggedPValue.class);
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ TupleTag tag = mock(TupleTag.class);
PCollection mockCollection = mock(PCollection.class);
- when(mockInput.getValue()).thenReturn(mockCollection);
+ inputs.put(tag, mockCollection);
- when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput));
+ when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream);
PValue mockOutput = mock(PValue.class);
@@ -93,22 +96,30 @@ public class FlattenPCollectionsTranslatorTest {
JavaStream javaStream1 = mock(JavaStream.class);
JavaStream javaStream2 = mock(JavaStream.class);
+ JavaStream mergedStream = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
- TaggedPValue mockInput1 = mock(TaggedPValue.class);
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ TupleTag tag1 = mock(TupleTag.class);
PCollection mockCollection1 = mock(PCollection.class);
- when(mockInput1.getValue()).thenReturn(mockCollection1);
+ inputs.put(tag1, mockCollection1);
- TaggedPValue mockInput2 = mock(TaggedPValue.class);
+ TupleTag tag2 = mock(TupleTag.class);
PCollection mockCollection2 = mock(PCollection.class);
- when(mockInput2.getValue()).thenReturn(mockCollection2);
+ inputs.put(tag2, mockCollection2);
- when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+ PCollection output = mock(PCollection.class);
+
+ when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
+ when(javaStream1.merge(javaStream2, transformName)).thenReturn(mergedStream);
+ when(javaStream2.merge(javaStream1, transformName)).thenReturn(mergedStream);
+
+ when(translationContext.getOutput()).thenReturn(output);
translator.translate(transform, translationContext);
- verify(javaStream1).merge(javaStream2, transformName);
+ verify(translationContext).setOutputStream(output, mergedStream);
}
@Test
@@ -120,14 +131,15 @@ public class FlattenPCollectionsTranslatorTest {
JavaStream javaStream1 = mock(JavaStream.class);
TranslationContext translationContext = mock(TranslationContext.class);
+ Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+ TupleTag tag1 = mock(TupleTag.class);
PCollection mockCollection1 = mock(PCollection.class);
- TaggedPValue mockInput1 = mock(TaggedPValue.class);
- when(mockInput1.getValue()).thenReturn(mockCollection1);
+ inputs.put(tag1, mockCollection1);
- TaggedPValue mockInput2 = mock(TaggedPValue.class);
- when(mockInput2.getValue()).thenReturn(mockCollection1);
+ TupleTag tag2 = mock(TupleTag.class);
+ inputs.put(tag2, mockCollection1);
- when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+ when(translationContext.getInputs()).thenReturn(inputs);
when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
translator.translate(transform, translationContext);
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 9135022..4e66ba9 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -133,7 +133,8 @@ public class GroupByKeyTranslatorTest {
PaneInfo.NO_FIRING);
KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 =
- merge.fold(KV.of(null, null), KV.of(key1, value1));
+ merge.fold(KV.<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>>of(
+ null, null), KV.of(key1, value1));
assertThat(result1.getKey(), equalTo(key1));
assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1")));
http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 7ca6109..21b5841 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -38,7 +38,7 @@
<module>build-tools</module> -->
<module>core</module>
<module>io</module>
- <module>maven-archetypes</module>
+ <!--<module>maven-archetypes</module>-->
<module>extensions</module>
<!-- javadoc runs directly from the root parent as the last module
in the build to be able to capture runner-specific javadoc.