You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:41 UTC

[16/50] [abbrv] beam git commit: Update gearpump-runner against master changes.

Update gearpump-runner against master changes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/44d21ac6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/44d21ac6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/44d21ac6

Branch: refs/heads/master
Commit: 44d21ac662e263c09caf2dd3b93b1c325bdfea15
Parents: 46c41fc
Author: manuzhang <ow...@gmail.com>
Authored: Thu Apr 20 20:59:47 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Apr 21 23:04:18 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        | 11 +--
 .../gearpump/GearpumpPipelineTranslator.java    | 92 +++++++++++---------
 .../FlattenPCollectionsTranslator.java          |  6 +-
 .../translators/ParDoMultiOutputTranslator.java | 11 ++-
 .../translators/TranslationContext.java         | 11 ++-
 .../translators/WindowAssignTranslator.java     |  2 +-
 .../translators/functions/DoFnFunction.java     |  2 +-
 .../gearpump/translators/io/GearpumpSource.java |  3 +-
 .../gearpump/translators/io/ValuesSource.java   |  3 +-
 .../translators/utils/DoFnRunnerFactory.java    |  3 +-
 .../translators/utils/NoOpStepContext.java      |  3 +-
 .../FlattenPCollectionsTranslatorTest.java      | 48 ++++++----
 .../translators/GroupByKeyTranslatorTest.java   |  3 +-
 sdks/java/pom.xml                               |  2 +-
 14 files changed, 113 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index a691801..dcfa390 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -43,13 +43,13 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <gearpump.version>0.8.3-SNAPSHOT</gearpump.version>
+    <gearpump.version>0.8.3</gearpump.version>
   </properties>
 
   <profiles>
     <profile>
-      <id>local-runnable-on-service-tests</id>
-      <activation><activeByDefault>true</activeByDefault></activation>
+      <id>local-validates-runner-tests</id>
+      <activation><activeByDefault>false</activeByDefault></activation>
       <build>
         <plugins>
           <plugin>
@@ -63,14 +63,15 @@
                   <goal>test</goal>
                 </goals>
                 <configuration>
-                  <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+                  <groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,
-                    org.apache.beam.sdk.testing.UsesCommittedMetrics
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics,
+                    org.apache.beam.sdk.testing.UsesTestStream
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 1a36343..f5f5e70 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.runners.gearpump;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -27,13 +27,14 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
 import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
 import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
 import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
+import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.gearpump.translators.TransformTranslator;
@@ -45,9 +46,9 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.PTransformMatcher;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -72,7 +73,7 @@ import org.slf4j.LoggerFactory;
  * into Gearpump {@link Graph}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       GearpumpPipelineTranslator.class);
@@ -88,13 +89,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
 
   static {
     // register TransformTranslators
-    registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
+    registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator());
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
     registerTransformTranslator(Flatten.PCollections.class,
         new FlattenPCollectionsTranslator());
-    registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
+    registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
     registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
     registerTransformTranslator(View.CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
@@ -107,27 +108,30 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
   }
 
   public void translate(Pipeline pipeline) {
-    Map<PTransformMatcher, PTransformOverrideFactory> overrides =
-        ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-            .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+    List<PTransformOverride> overrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsMap.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsList.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
                 new ReflectiveOneToOneOverrideFactory(
-                    StreamingCombineGloballyAsSingletonView.class))
-            .put(PTransformMatchers.classEqualTo(View.AsMap.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class))
-            .put(PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class))
-            .put(PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class))
-            .put(PTransformMatchers.classEqualTo(View.AsList.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class))
-            .put(PTransformMatchers.classEqualTo(View.AsIterable.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class))
+                    StreamingCombineGloballyAsSingletonView.class)))
             .build();
 
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        overrides.entrySet()) {
-      pipeline.replace(override.getKey(), override.getValue());
-    }
+    pipeline.replaceAll(overrides);
     pipeline.traverseTopologically(this);
   }
 
@@ -185,22 +189,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
 
   // The following codes are forked from DataflowRunner for View translator
   private static class ReflectiveOneToOneOverrideFactory<
-      InputT extends PValue,
-      OutputT extends PValue,
-      TransformT extends PTransform<InputT, OutputT>>
-      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
-    private final Class<PTransform<InputT, OutputT>> replacement;
+          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
+      extends SingleInputOutputOverrideFactory<
+          PCollection<InputT>, PCollection<OutputT>, TransformT> {
+    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
 
     private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<InputT, OutputT>> replacement) {
+        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement) {
       this.replacement = replacement;
     }
 
     @Override
-    public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
-      return InstanceBuilder.ofType(replacement)
-          .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
-          .build();
+    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
+        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          InstanceBuilder.ofType(replacement)
+              .withArg(
+                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
+                      transform.getTransform().getClass(),
+                  transform.getTransform())
+              .build());
     }
   }
 
@@ -220,7 +229,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, V>> view =
           PCollectionViews.mapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -259,7 +268,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
       PCollectionView<Map<K, Iterable<V>>> view =
           PCollectionViews.multimapView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -298,7 +307,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
       PCollectionView<Iterable<T>> view =
           PCollectionViews.iterableView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -328,7 +337,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     public PCollectionView<List<T>> expand(PCollection<T> input) {
       PCollectionView<List<T>> view =
           PCollectionViews.listView(
-              input.getPipeline(),
+              input,
               input.getWindowingStrategy(),
               input.getCoder());
 
@@ -341,6 +350,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
       return "StreamingViewAsList";
     }
   }
+
   private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
 
@@ -360,7 +370,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
               .withFanout(transform.getFanout()));
 
       PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
+          combined,
           combined.getWindowingStrategy(),
           transform.getInsertDefault(),
           transform.getInsertDefault()

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 56f7d1a..5ca05d8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
@@ -45,8 +45,8 @@ public class FlattenPCollectionsTranslator<T> implements
   public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
     JavaStream<T> merged = null;
     Set<PCollection<T>> unique = new HashSet<>();
-    for (TaggedPValue input: context.getInputs()) {
-      PCollection<T> collection = (PCollection<T>) input.getValue();
+    for (PValue input: context.getInputs().values()) {
+      PCollection<T> collection = (PCollection<T>) input;
       JavaStream<T> inputStream = context.getInputStream(collection);
       if (null == merged) {
         merged = inputStream;

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
index e78568d..d92979b 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoMultiOutputTranslator.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
@@ -55,11 +55,10 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements
     Map<String, PCollectionView<?>> tagsToSideInputs =
         TranslatorUtils.getTagsToSideInputs(sideInputs);
 
-    List<TaggedPValue> outputs = context.getOutputs();
+    Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
     List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1);
-    for (TaggedPValue output: outputs) {
-      TupleTag<?> tag = output.getTag();
+    for (TupleTag<?> tag: outputs.keySet()) {
       if (tag != null && !tag.getId().equals(mainOutput.getId())) {
         sideOutputs.add(tag);
       }
@@ -78,9 +77,9 @@ public class ParDoMultiOutputTranslator<InputT, OutputT> implements
                 tagsToSideInputs,
                 mainOutput,
                 sideOutputs), transform.getName());
-    for (TaggedPValue output: outputs) {
+    for (Map.Entry<TupleTag<?>, PValue> output: outputs.entrySet()) {
       JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
-          .filter(new FilterByOutputTag(output.getTag().getId()),
+          .filter(new FilterByOutputTag(output.getKey().getId()),
               "filter_by_output_tag")
           .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
       context.setOutputStream(output.getValue(), taggedStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index e88bb74..eb6bc18 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.collect.Iterables;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
 
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
@@ -71,20 +70,20 @@ public class TranslationContext {
     }
   }
 
-  public List<TaggedPValue> getInputs() {
+  public Map<TupleTag<?>, PValue> getInputs() {
     return getCurrentTransform().getInputs();
   }
 
   public PValue getInput() {
-    return Iterables.getOnlyElement(getInputs()).getValue();
+    return Iterables.getOnlyElement(getInputs().values());
   }
 
-  public List<TaggedPValue> getOutputs() {
+  public Map<TupleTag<?>, PValue> getOutputs() {
     return getCurrentTransform().getOutputs();
   }
 
   public PValue getOutput() {
-    return Iterables.getOnlyElement(getOutputs()).getValue();
+    return Iterables.getOnlyElement(getOutputs().values());
   }
 
   private AppliedPTransform<?, ?, ?> getCurrentTransform() {

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 2d70b63..149f80c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.joda.time.Instant;
 
 /**
- * {@link Window.Bound} is translated to Gearpump flatMap function.
+ * {@link Window.Assign} is translated to Gearpump flatMap function.
  */
 @SuppressWarnings("unchecked")
 public class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index 9941e71..3473f53 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -156,7 +156,7 @@ public class DoFnFunction<InputT, OutputT> extends
       for (WindowedValue<InputT> value : pushedBackValues) {
         for (BoundedWindow win: value.getWindows()) {
           BoundedWindow sideInputWindow =
-              sideInput.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(win);
+              sideInput.getWindowMappingFn().getSideInputWindow(win);
           if (!sideInputReader.isReady(sideInput, sideInputWindow)) {
             Object emptyValue = WindowedValue.of(
                 Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index c079603..5e79151 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -46,6 +46,7 @@ public abstract class GearpumpSource<T> implements DataSource {
 
   private Source.Reader<T> reader;
   private boolean available = false;
+  private long count = 0L;
 
   GearpumpSource(PipelineOptions options) {
     try {
@@ -112,7 +113,7 @@ public abstract class GearpumpSource<T> implements DataSource {
       }
     } else {
       if (available) {
-        return TranslatorUtils.jodaTimeToJava8Time(reader.getCurrentTimestamp());
+        return Watermark.MIN();
       } else {
         return Watermark.MAX();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index ccd5cdf..b62da19 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 
 import javax.annotation.Nullable;
@@ -68,7 +69,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
   }
 
   @Override
-  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> generateInitialSplits(
+  public java.util.List<? extends UnboundedSource<T, CheckpointMark>> split(
       int desiredNumSplits, PipelineOptions options) throws Exception {
     return Collections.singletonList(this);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index bdfc336..70b4271 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -28,6 +28,7 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -78,7 +79,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
     DoFnRunner<InputT, OutputT> underlying = DoFnRunners.simpleRunner(
         options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
-    return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
+    return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 140df2a..4e0a74c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -49,7 +49,8 @@ public class NoOpStepContext implements ExecutionContext.StepContext, Serializab
   }
 
   @Override
-  public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {
+  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
index fa89d4a..ac12fa4 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -26,13 +26,15 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.source.DataSource;
@@ -55,10 +57,10 @@ public class FlattenPCollectionsTranslatorTest {
   @Test
   @SuppressWarnings({"rawtypes", "unchecked"})
   public void testTranslateWithEmptyCollection() {
-    PValue mockOutput = mock(PValue.class);
+    PCollection mockOutput = mock(PCollection.class);
     TranslationContext translationContext = mock(TranslationContext.class);
 
-    when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST);
+    when(translationContext.getInputs()).thenReturn(Collections.EMPTY_MAP);
     when(translationContext.getOutput()).thenReturn(mockOutput);
 
     translator.translate(transform, translationContext);
@@ -71,11 +73,12 @@ public class FlattenPCollectionsTranslatorTest {
     JavaStream javaStream = mock(JavaStream.class);
     TranslationContext translationContext = mock(TranslationContext.class);
 
-    TaggedPValue mockInput = mock(TaggedPValue.class);
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    TupleTag tag = mock(TupleTag.class);
     PCollection mockCollection = mock(PCollection.class);
-    when(mockInput.getValue()).thenReturn(mockCollection);
+    inputs.put(tag, mockCollection);
 
-    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput));
+    when(translationContext.getInputs()).thenReturn(inputs);
     when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream);
 
     PValue mockOutput = mock(PValue.class);
@@ -93,22 +96,30 @@ public class FlattenPCollectionsTranslatorTest {
 
     JavaStream javaStream1 = mock(JavaStream.class);
     JavaStream javaStream2 = mock(JavaStream.class);
+    JavaStream mergedStream = mock(JavaStream.class);
     TranslationContext translationContext = mock(TranslationContext.class);
 
-    TaggedPValue mockInput1 = mock(TaggedPValue.class);
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    TupleTag tag1 = mock(TupleTag.class);
     PCollection mockCollection1 = mock(PCollection.class);
-    when(mockInput1.getValue()).thenReturn(mockCollection1);
+    inputs.put(tag1, mockCollection1);
 
-    TaggedPValue mockInput2 = mock(TaggedPValue.class);
+    TupleTag tag2 = mock(TupleTag.class);
     PCollection mockCollection2 = mock(PCollection.class);
-    when(mockInput2.getValue()).thenReturn(mockCollection2);
+    inputs.put(tag2, mockCollection2);
 
-    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+    PCollection output = mock(PCollection.class);
+
+    when(translationContext.getInputs()).thenReturn(inputs);
     when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
     when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
+    when(javaStream1.merge(javaStream2, transformName)).thenReturn(mergedStream);
+    when(javaStream2.merge(javaStream1, transformName)).thenReturn(mergedStream);
+
+    when(translationContext.getOutput()).thenReturn(output);
 
     translator.translate(transform, translationContext);
-    verify(javaStream1).merge(javaStream2, transformName);
+    verify(translationContext).setOutputStream(output, mergedStream);
   }
 
   @Test
@@ -120,14 +131,15 @@ public class FlattenPCollectionsTranslatorTest {
     JavaStream javaStream1 = mock(JavaStream.class);
     TranslationContext translationContext = mock(TranslationContext.class);
 
+    Map<TupleTag<?>, PValue> inputs = new HashMap<>();
+    TupleTag tag1 = mock(TupleTag.class);
     PCollection mockCollection1 = mock(PCollection.class);
-    TaggedPValue mockInput1 = mock(TaggedPValue.class);
-    when(mockInput1.getValue()).thenReturn(mockCollection1);
+    inputs.put(tag1, mockCollection1);
 
-    TaggedPValue mockInput2 = mock(TaggedPValue.class);
-    when(mockInput2.getValue()).thenReturn(mockCollection1);
+    TupleTag tag2 = mock(TupleTag.class);
+    inputs.put(tag2, mockCollection1);
 
-    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+    when(translationContext.getInputs()).thenReturn(inputs);
     when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
 
     translator.translate(transform, translationContext);

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 9135022..4e66ba9 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -133,7 +133,8 @@ public class GroupByKeyTranslatorTest {
             PaneInfo.NO_FIRING);
 
     KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 =
-        merge.fold(KV.of(null, null), KV.of(key1, value1));
+        merge.fold(KV.<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>>of(
+            null, null), KV.of(key1, value1));
     assertThat(result1.getKey(), equalTo(key1));
     assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1")));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/44d21ac6/sdks/java/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml
index 7ca6109..21b5841 100644
--- a/sdks/java/pom.xml
+++ b/sdks/java/pom.xml
@@ -38,7 +38,7 @@
     <module>build-tools</module> -->
     <module>core</module>
     <module>io</module>
-    <module>maven-archetypes</module>
+		<!--<module>maven-archetypes</module>-->
     <module>extensions</module>
     <!-- javadoc runs directly from the root parent as the last module
          in the build to be able to capture runner-specific javadoc.