You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:34 UTC

[09/50] [abbrv] beam git commit: [BEAM-79] Fix gearpump-runner merge conflicts and test failure

[BEAM-79] Fix gearpump-runner merge conflicts and test failure


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eab6a64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eab6a64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eab6a64

Branch: refs/heads/master
Commit: 3eab6a647e4761725680c8bc40589dfa5569d75b
Parents: 3f91798
Author: manuzhang <ow...@gmail.com>
Authored: Tue Mar 14 08:09:46 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Mar 15 15:21:29 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  51 ++-
 .../gearpump/GearpumpPipelineResult.java        |  21 +-
 .../gearpump/GearpumpPipelineTranslator.java    | 388 ++++++++++++++++++-
 .../beam/runners/gearpump/GearpumpRunner.java   | 376 +-----------------
 .../runners/gearpump/TestGearpumpRunner.java    |  38 +-
 .../gearpump/examples/StreamingWordCount.java   |  98 -----
 .../gearpump/examples/UnboundedTextSource.java  | 139 -------
 .../runners/gearpump/examples/package-info.java |  22 --
 ...CreateGearpumpPCollectionViewTranslator.java |  14 +-
 .../CreatePCollectionViewTranslator.java        |   6 +-
 .../translators/CreateValuesTranslator.java     |  51 ---
 .../FlattenPCollectionTranslator.java           |  84 ----
 .../FlattenPCollectionsTranslator.java          |  83 ++++
 .../translators/GroupByKeyTranslator.java       |   4 +-
 .../translators/ParDoBoundMultiTranslator.java  |  32 +-
 .../translators/ParDoBoundTranslator.java       |   7 +-
 .../translators/ReadBoundedTranslator.java      |   4 +-
 .../translators/ReadUnboundedTranslator.java    |   4 +-
 .../translators/TransformTranslator.java        |   2 +-
 .../translators/TranslationContext.java         |  29 +-
 .../translators/WindowAssignTranslator.java     | 100 +++++
 .../translators/WindowBoundTranslator.java      | 100 -----
 .../translators/functions/DoFnFunction.java     |  12 +-
 .../translators/io/UnboundedSourceWrapper.java  |   1 +
 .../translators/utils/DoFnRunnerFactory.java    |   4 +-
 .../utils/NoOpAggregatorFactory.java            |   2 +-
 .../translators/utils/NoOpStepContext.java      |   6 +-
 .../translators/utils/TranslatorUtils.java      |   2 -
 .../translators/utils/TranslatorUtilsTest.java  |   1 -
 29 files changed, 703 insertions(+), 978 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 3efb1f6..9a6a432 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <groupId>org.apache.beam</groupId>
     <artifactId>beam-runners-parent</artifactId>
-    <version>0.5.0-incubating-SNAPSHOT</version>
+    <version>0.7.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 
@@ -65,10 +65,12 @@
                 <configuration>
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>
+                    org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
-                    org.apache.beam.sdk.testing.UsesMetrics
+                    org.apache.beam.sdk.testing.UsesAttemptedMetrics,
+                    org.apache.beam.sdk.testing.UsesCommittedMetrics
                   </excludedGroups>
                   <parallel>none</parallel>
                   <failIfNoTests>true</failIfNoTests>
@@ -136,6 +138,16 @@
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-jdk14</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
@@ -182,6 +194,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
@@ -210,8 +227,36 @@
 
       <!-- Java compiler -->
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+          <testSource>1.8</testSource>
+          <testTarget>1.8</testTarget>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>enforce</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <enforceBytecodeVersion>
+                  <maxJdkVersion>1.8</maxJdkVersion>
+                </enforceBytecodeVersion>
+                <requireJavaVersion>
+                  <version>[1.8,)</version>
+                </requireJavaVersion>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
       </plugin>
 
       <!-- uber jar -->

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index 8f90898..d833cd6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -43,6 +43,7 @@ public class GearpumpPipelineResult implements PipelineResult {
 
   private final ClientContext client;
   private final RunningApplication app;
+  private boolean finished = false;
 
   public GearpumpPipelineResult(ClientContext client, RunningApplication app) {
     this.client = client;
@@ -51,13 +52,22 @@ public class GearpumpPipelineResult implements PipelineResult {
 
   @Override
   public State getState() {
-    return getGearpumpState();
+    if (!finished) {
+      return getGearpumpState();
+    } else {
+      return State.DONE;
+    }
   }
 
   @Override
   public State cancel() throws IOException {
-    app.shutDown();
-    return State.CANCELLED;
+    if (!finished) {
+      app.shutDown();
+      finished = true;
+      return State.CANCELLED;
+    } else {
+      return State.DONE;
+    }
   }
 
   @Override
@@ -67,7 +77,10 @@ public class GearpumpPipelineResult implements PipelineResult {
 
   @Override
   public State waitUntilFinish() {
-    app.waitUntilFinish();
+    if (!finished) {
+      app.waitUntilFinish();
+      finished = true;
+    }
     return State.DONE;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 4cc060c..1a36343 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -18,13 +18,19 @@
 
 package org.apache.beam.runners.gearpump;
 
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
 import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
-import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
+import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
 import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
 import org.apache.beam.runners.gearpump.translators.ParDoBoundMultiTranslator;
 import org.apache.beam.runners.gearpump.translators.ParDoBoundTranslator;
@@ -32,17 +38,29 @@ import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.gearpump.translators.TransformTranslator;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator;
+import org.apache.beam.runners.gearpump.translators.WindowAssignTranslator;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 
 import org.apache.gearpump.util.Graph;
@@ -74,14 +92,13 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
-    registerTransformTranslator(Flatten.FlattenPCollectionList.class,
-        new FlattenPCollectionTranslator());
+    registerTransformTranslator(Flatten.PCollections.class,
+        new FlattenPCollectionsTranslator());
     registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
-    registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
-    registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+    registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
     registerTransformTranslator(View.CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
-    registerTransformTranslator(GearpumpRunner.CreateGearpumpPCollectionView.class,
+    registerTransformTranslator(CreateGearpumpPCollectionView.class,
         new CreateGearpumpPCollectionViewTranslator<>());
   }
 
@@ -90,6 +107,27 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
   }
 
   public void translate(Pipeline pipeline) {
+    Map<PTransformMatcher, PTransformOverrideFactory> overrides =
+        ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
+            .put(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    StreamingCombineGloballyAsSingletonView.class))
+            .put(PTransformMatchers.classEqualTo(View.AsMap.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class))
+            .put(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class))
+            .put(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class))
+            .put(PTransformMatchers.classEqualTo(View.AsList.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class))
+            .put(PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class))
+            .build();
+
+    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
+        overrides.entrySet()) {
+      pipeline.replace(override.getKey(), override.getValue());
+    }
     pipeline.traverseTopologically(this);
   }
 
@@ -145,5 +183,337 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     return transformTranslators.get(transformClass);
   }
 
+  // The following codes are forked from DataflowRunner for View translator
+  private static class ReflectiveOneToOneOverrideFactory<
+      InputT extends PValue,
+      OutputT extends PValue,
+      TransformT extends PTransform<InputT, OutputT>>
+      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+    private final Class<PTransform<InputT, OutputT>> replacement;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<InputT, OutputT>> replacement) {
+      this.replacement = replacement;
+    }
+
+    @Override
+    public PTransform<InputT, OutputT> getReplacementTransform(TransformT transform) {
+      return InstanceBuilder.ofType(replacement)
+          .withArg((Class<PTransform<InputT, OutputT>>) transform.getClass(), transform)
+          .build();
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Gearpump runner.
+   */
+  private static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private static final long serialVersionUID = 4791080760092950304L;
+
+    public StreamingViewAsMap(View.AsMap<K, V> transform) {}
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // throw new RuntimeException(e);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+    private static final long serialVersionUID = 5854899081751333352L;
+
+    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // throw new RuntimeException(e);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+
+    private static final long serialVersionUID = -3399860618995613421L;
+
+    public StreamingViewAsIterable(View.AsIterable<T> transform) {}
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+
+    private static final long serialVersionUID = -5018631473886330629L;
+
+    public StreamingViewAsList(View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+
+    private static final long serialVersionUID = 9064900748869035738L;
+    private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    public StreamingCombineGloballyAsSingletonView(
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined.getPipeline(),
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+
+    private static final long serialVersionUID = 5870455965625071546L;
+    private final View.AsSingleton<T> transform;
+
+    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
 
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private static final long serialVersionUID = -2637073020800540542L;
+    private PCollectionView<ViewT> view;
+
+    private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateGearpumpPCollectionView<>(view);
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 72f2126..897467a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -17,40 +17,18 @@
  */
 package org.apache.beam.runners.gearpump;
 
-import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
+
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
@@ -72,21 +50,8 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
   private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
   private static final String DEFAULT_APPNAME = "beam_gearpump_app";
 
-  /** Custom transforms implementations. */
-  private final Map<Class<?>, Class<?>> overrides;
-
   public GearpumpRunner(GearpumpPipelineOptions options) {
     this.options = options;
-
-    ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.builder();
-    builder.put(Combine.GloballyAsSingletonView.class,
-        StreamingCombineGloballyAsSingletonView.class);
-    builder.put(View.AsMap.class, StreamingViewAsMap.class);
-    builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
-    builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
-    builder.put(View.AsList.class, StreamingViewAsList.class);
-    builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
-    overrides = builder.build();
   }
 
   public static GearpumpRunner fromOptions(PipelineOptions options) {
@@ -95,31 +60,6 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     return new GearpumpRunner(pipelineOptions);
   }
 
-
-  public <OutputT extends POutput, InputT extends PInput> OutputT apply(
-      PTransform<InputT, OutputT> transform, InputT input) {
-    if (overrides.containsKey(transform.getClass())) {
-
-      Class<PTransform<InputT, OutputT>> transformClass =
-          (Class<PTransform<InputT, OutputT>>) transform.getClass();
-
-      Class<PTransform<InputT, OutputT>> customTransformClass =
-          (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
-
-      PTransform<InputT, OutputT> customTransform =
-          InstanceBuilder.ofType(customTransformClass)
-              .withArg(transformClass, transform)
-              .build();
-
-      return Pipeline.applyTransform(input, customTransform);
-    } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
-            && ((PCollectionList<?>) input).size() == 0) {
-      return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
-    } else {
-      return super.apply(transform, input);
-    }
-  }
-
   @Override
   public GearpumpPipelineResult run(Pipeline pipeline) {
     String appName = options.getApplicationName();
@@ -170,318 +110,4 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
 
 
-  // The following codes are forked from DataflowRunner for View translator
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Gearpump runner.
-   */
-  private static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private static final long serialVersionUID = 4791080760092950304L;
-
-    public StreamingViewAsMap(View.AsMap<K, V> transform) {}
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // throw new RuntimeException(e);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    private static final long serialVersionUID = 5854899081751333352L;
-
-    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // throw new RuntimeException(e);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-
-    private static final long serialVersionUID = -3399860618995613421L;
-
-    public StreamingViewAsIterable(View.AsIterable<T> transform) {}
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-
-    private static final long serialVersionUID = -5018631473886330629L;
-
-    public StreamingViewAsList(View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input.getPipeline(),
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-
-    private static final long serialVersionUID = 9064900748869035738L;
-    private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    public StreamingCombineGloballyAsSingletonView(
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined.getPipeline(),
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-
-    private static final long serialVersionUID = 5870455965625071546L;
-    private final View.AsSingleton<T> transform;
-
-    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateGearpumpPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private static final long serialVersionUID = -2637073020800540542L;
-    private PCollectionView<ViewT> view;
-
-    private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateGearpumpPCollectionView<>(view);
-    }
-
-    public PCollectionView<ViewT> getView() {
-      return view;
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index c96bcb1..ea7dd26 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -24,9 +24,6 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
@@ -58,36 +55,9 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
   @Override
   public GearpumpPipelineResult run(Pipeline pipeline) {
-    try {
-      GearpumpPipelineResult result = delegate.run(pipeline);
-      result.waitUntilFinish();
-      cluster.stop();
-      return result;
-    } catch (Throwable e) {
-      // copied from TestFlinkRunner to pull out AssertionError
-      // which is wrapped in UserCodeException
-      Throwable cause = e;
-      Throwable oldCause;
-      do {
-        if (cause.getCause() == null) {
-          break;
-        }
-
-        oldCause = cause;
-        cause = cause.getCause();
-
-      } while (!oldCause.equals(cause));
-      if (cause instanceof AssertionError) {
-        throw (AssertionError) cause;
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  @Override
-  public <OutputT extends POutput, InputT extends PInput>
-  OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
-    return delegate.apply(transform, input);
+    GearpumpPipelineResult result = delegate.run(pipeline);
+    result.waitUntilFinish();
+    cluster.stop();
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
deleted file mode 100644
index b2d762a..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/StreamingWordCount.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.examples;
-
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.GearpumpRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.gearpump.cluster.client.ClientContext;
-import org.joda.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * streaming word count example on Gearpump runner.
- */
-public class StreamingWordCount {
-
-  static class ExtractWordsFn extends DoFn<String, String> {
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      // Split the line into words.
-      String[] words = c.element().split("[^a-zA-Z']+");
-
-      // Output each word encountered into the output PCollection.
-      for (String word : words) {
-        if (!word.isEmpty()) {
-          c.output(word);
-        }
-      }
-    }
-  }
-
-  static class FormatAsStringFn extends DoFn<KV<String, Long>, String> {
-    private static final Logger LOG = LoggerFactory.getLogger(FormatAsStringFn.class);
-
-    @ProcessElement
-    public void process(ProcessContext c) {
-      String row = c.element().getKey()
-          + " - " + c.element().getValue()
-          + " @ " + c.timestamp().toString();
-      LOG.debug("output {}", row);
-      c.output(row);
-    }
-  }
-
-
-  public static void main(String[] args) {
-    GearpumpPipelineOptions options = PipelineOptionsFactory
-            .fromArgs(args).as(GearpumpPipelineOptions.class);
-    options.setRunner(GearpumpRunner.class);
-    options.setApplicationName("StreamingWordCount");
-    options.setParallelism(1);
-
-    Pipeline p = Pipeline.create(options);
-
-    PCollection<KV<String, Long>> wordCounts =
-        p.apply(Read.from(new UnboundedTextSource()))
-            .apply(ParDo.of(new ExtractWordsFn()))
-            .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))))
-            .apply(Count.<String>perElement());
-
-    wordCounts.apply(ParDo.of(new FormatAsStringFn()));
-
-    p.run();
-
-    ClientContext clientContext = options.getClientContext();
-    clientContext.close();
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
deleted file mode 100644
index b014432..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/UnboundedTextSource.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.examples;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-
-/**
- * unbounded source that reads from text.
- */
-public class UnboundedTextSource extends UnboundedSource<String, UnboundedSource.CheckpointMark> {
-
-  @Override
-  public List<? extends UnboundedSource<String, CheckpointMark>> generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) throws Exception {
-    return Collections.<UnboundedSource<String, CheckpointMark>>singletonList(this);
-  }
-
-  @Override
-  public UnboundedReader<String> createReader(PipelineOptions options,
-      @Nullable CheckpointMark checkpointMark) {
-    return new UnboundedTextReader(this);
-  }
-
-  @Nullable
-  @Override
-  public Coder<CheckpointMark> getCheckpointMarkCoder() {
-    return null;
-  }
-
-  @Override
-  public void validate() {
-  }
-
-  @Override
-  public Coder<String> getDefaultOutputCoder() {
-    return StringUtf8Coder.of();
-  }
-
-  /**
-   * reads from text.
-   */
-  public static class UnboundedTextReader extends UnboundedReader<String> implements Serializable {
-
-    private static final long serialVersionUID = 7526472295622776147L;
-
-    private final UnboundedTextSource source;
-
-    private final String[] texts = new String[]{"foo foo foo bar bar", "foo foo bar bar bar"};
-    private long index = 0;
-
-    private String currentRecord;
-
-    private Instant currentTimestamp;
-
-    public UnboundedTextReader(UnboundedTextSource source) {
-      this.source = source;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      currentRecord = texts[0];
-      currentTimestamp = new Instant(0);
-      return true;
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      index++;
-      currentRecord = texts[(int) index % (texts.length)];
-      currentTimestamp = new Instant(index * 1000);
-
-      return true;
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-      return new byte[0];
-    }
-
-    @Override
-    public String getCurrent() throws NoSuchElementException {
-      return this.currentRecord;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return currentTimestamp;
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public Instant getWatermark() {
-      return currentTimestamp;
-    }
-
-    @Override
-    public CheckpointMark getCheckpointMark() {
-      return null;
-    }
-
-    @Override
-    public UnboundedSource<String, ?> getCurrentSource() {
-      return this.source;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
deleted file mode 100644
index a62a6c0..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/examples/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Examples showcase Beam application over Gearpump runner.
- */
-package org.apache.beam.runners.gearpump.examples;

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index d05c89d..c7f24a8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -20,25 +20,27 @@ package org.apache.beam.runners.gearpump.translators;
 
 import java.util.List;
 
-import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
-
 /**
  * CreateGearpumpPCollectionView bridges input stream to down stream
  * transforms.
  */
 public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
-    TransformTranslator<GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+    TransformTranslator<GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+
+  private static final long serialVersionUID = -3955521308055056034L;
 
   @Override
-  public void translate(GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+  public void translate(
+      GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
       TranslationContext context) {
     JavaStream<WindowedValue<List<ElemT>>> inputStream =
-        context.getInputStream(context.getInput(transform));
-    PCollectionView<ViewT> view = transform.getView();
+        context.getInputStream(context.getInput());
+    PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
     context.setOutputStream(view, inputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
index e9e2e5d..da55d70 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
@@ -32,12 +32,14 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
     TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
 
+  private static final long serialVersionUID = -2394386873317515748L;
+
   @Override
   public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
                         TranslationContext context) {
     JavaStream<WindowedValue<List<ElemT>>> inputStream =
-        context.getInputStream(context.getInput(transform));
-    PCollectionView<ViewT> view = transform.getView();
+        context.getInputStream(context.getInput());
+    PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
     context.setOutputStream(view, inputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
deleted file mode 100644
index e5dc6dd..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
-import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.util.WindowedValue;
-
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * Wraps elements from Create.Values into an {@link UnboundedSource}.
- * mainly used for test
- */
-public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
-
-  private static final long serialVersionUID = 5411841848199229738L;
-
-  @Override
-  public void translate(Create.Values<T> transform, TranslationContext context) {
-    try {
-      UnboundedSourceWrapper<T, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
-          new ValuesSource<>(transform.getElements(),
-              transform.getDefaultOutputCoder(context.getInput(transform))),
-          context.getPipelineOptions());
-      JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
-      context.setOutputStream(context.getOutput(transform), sourceStream);
-    } catch (CannotProvideCoderException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
deleted file mode 100644
index 27e54b8..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import com.google.common.collect.Lists;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
-import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-
-
-/**
- * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
- * Note only two-way merge is working now
- */
-public class FlattenPCollectionTranslator<T> implements
-    TransformTranslator<Flatten.FlattenPCollectionList<T>> {
-
-  private static final long serialVersionUID = -5552148802472944759L;
-
-  @Override
-  public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
-    JavaStream<T> merged = null;
-    Set<PCollection<T>> unique = new HashSet<>();
-    for (PCollection<T> collection : context.getInput(transform).getAll()) {
-      unique.add(collection);
-      JavaStream<T> inputStream = context.getInputStream(collection);
-      if (null == merged) {
-        merged = inputStream;
-      } else {
-        // duplicate edges are not allowed in Gearpump graph
-        // so we route through a dummy node
-        if (unique.contains(collection)) {
-          inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
-        }
-
-        merged = merged.merge(inputStream, transform.getName());
-      }
-    }
-
-    if (null == merged) {
-      UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
-          new ValuesSource<>(Lists.newArrayList("dummy"),
-              StringUtf8Coder.of()), context.getPipelineOptions());
-      merged = context.getSourceStream(unboundedSourceWrapper);
-    }
-    context.setOutputStream(context.getOutput(transform), merged);
-  }
-
-  private static class DummyFunction<T> extends MapFunction<T, T> {
-
-    private static final long serialVersionUID = 5454396869997290471L;
-
-    @Override
-    public T map(T t) {
-      return t;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
new file mode 100644
index 0000000..3a465cb
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
+ */
+public class FlattenPCollectionsTranslator<T> implements
+    TransformTranslator<Flatten.PCollections<T>> {
+
+  private static final long serialVersionUID = -5552148802472944759L;
+
+  @Override
+  public void translate(Flatten.PCollections<T> transform, TranslationContext context) {
+    JavaStream<T> merged = null;
+    Set<PCollection<T>> unique = new HashSet<>();
+    for (TaggedPValue input: context.getInputs()) {
+      PCollection<T> collection = (PCollection<T>) input.getValue();
+      unique.add(collection);
+      JavaStream<T> inputStream = context.getInputStream(collection);
+      if (null == merged) {
+        merged = inputStream;
+      } else {
+        // duplicate edges are not allowed in Gearpump graph
+        // so we route through a dummy node
+        if (unique.contains(collection)) {
+          inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
+        }
+
+        merged = merged.merge(inputStream, transform.getName());
+      }
+    }
+
+    if (null == merged) {
+      UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+          new ValuesSource<>(Lists.newArrayList("dummy"),
+              StringUtf8Coder.of()), context.getPipelineOptions());
+      merged = context.getSourceStream(unboundedSourceWrapper);
+    }
+    context.setOutputStream(context.getOutput(), merged);
+  }
+
+  private static class DummyFunction<T> extends MapFunction<T, T> {
+
+    private static final long serialVersionUID = 5454396869997290471L;
+
+    @Override
+    public T map(T t) {
+      return t;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index df8bfe9..5dfd3e9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -61,7 +61,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
 
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
+    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
     Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     JavaStream<WindowedValue<KV<K, V>>> inputStream =
         context.getInputStream(input);
@@ -80,7 +80,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
         .fold(new Merge<>(windowFn, outputTimeFn), "merge")
         .map(new Values<K, V>(), "values");
 
-    context.setOutputStream(context.getOutput(transform), outputStream);
+    context.setOutputStream(context.getOutput(), outputStream);
   }
 
   private static class GearpumpWindowFn<T, W extends BoundedWindow>

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index 8c57019..e88cb73 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,17 +18,11 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
 import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -36,6 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
@@ -54,21 +49,21 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
+    PCollection<InputT> inputT = (PCollection<InputT>) context.getInput();
     JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
     Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Map<String, PCollectionView<?>> tagsToSideInputs =
         TranslatorUtils.getTagsToSideInputs(sideInputs);
 
-    Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+    List<TaggedPValue> outputs = context.getOutputs();
     final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
-    List<TupleTag<?>> sideOutputs = Lists.newLinkedList(Sets.filter(outputs.keySet(),
-        new Predicate<TupleTag<?>>() {
-          @Override
-          public boolean apply(@Nullable TupleTag<?> tupleTag) {
-            return tupleTag != null && !tupleTag.getId().equals(mainOutput.getId());
-          }
-        }));
+    List<TupleTag<?>> sideOutputs = new ArrayList<>(outputs.size() - 1);
+    for (TaggedPValue output: outputs) {
+      TupleTag<?> tag = output.getTag();
+      if (tag != null && !tag.getId().equals(mainOutput.getId())) {
+        sideOutputs.add(tag);
+      }
+    }
 
     JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream(
         context, inputStream, tagsToSideInputs);
@@ -83,10 +78,9 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> implements
                 tagsToSideInputs,
                 mainOutput,
                 sideOutputs), transform.getName());
-    for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
-      output.getValue().getCoder();
+    for (TaggedPValue output: outputs) {
       JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
-          .filter(new FilterByOutputTag(output.getKey().getId()),
+          .filter(new FilterByOutputTag(output.getTag().getId()),
               "filter_by_output_tag")
           .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
       context.setOutputStream(output.getValue(), taggedStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index efae938..dc32b8c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
-
 /**
  * {@link ParDo.Bound} is translated to Gearpump flatMap function
  * with {@link DoFn} wrapped in {@link DoFnFunction}.
@@ -50,14 +49,14 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
     DoFn<InputT, OutputT> doFn = transform.getFn();
-    PCollection<OutputT> output = context.getOutput(transform);
+    PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
     WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
 
     Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Map<String, PCollectionView<?>> tagsToSideInputs =
         TranslatorUtils.getTagsToSideInputs(sideInputs);
     JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
-        context.getInput(transform));
+        context.getInput());
     JavaStream<TranslatorUtils.RawUnionValue> unionStream =
         TranslatorUtils.withSideInputStream(context,
         inputStream, tagsToSideInputs);
@@ -71,6 +70,6 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
             .flatMap(doFnFunction, transform.getName())
             .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
 
-    context.setOutputStream(context.getOutput(transform), outputStream);
+    context.setOutputStream(context.getOutput(), outputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
index 478d58f..8f71a8e 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslator.java
@@ -31,6 +31,8 @@ import org.apache.gearpump.streaming.source.DataSource;
  */
 public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bounded<T>> {
 
+  private static final long serialVersionUID = -3899020490896998330L;
+
   @Override
   public void translate(Read.Bounded<T> transform, TranslationContext context) {
     BoundedSource<T> boundedSource = transform.getSource();
@@ -38,7 +40,7 @@ public class ReadBoundedTranslator <T> implements TransformTranslator<Read.Bound
         context.getPipelineOptions());
     JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(sourceWrapper);
 
-    context.setOutputStream(context.getOutput(transform), sourceStream);
+    context.setOutputStream(context.getOutput(), sourceStream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
index 7e12a9c..0462c57 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslator.java
@@ -33,6 +33,8 @@ import org.apache.gearpump.streaming.source.DataSource;
 
 public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbounded<T>> {
 
+  private static final long serialVersionUID = 3529494817859948619L;
+
   @Override
   public void translate(Read.Unbounded<T> transform, TranslationContext context) {
     UnboundedSource<T, ?> unboundedSource = transform.getSource();
@@ -40,7 +42,7 @@ public class ReadUnboundedTranslator<T> implements TransformTranslator<Read.Unbo
         unboundedSource, context.getPipelineOptions());
     JavaStream<WindowedValue<T>> sourceStream = context.getSourceStream(unboundedSourceWrapper);
 
-    context.setOutputStream(context.getOutput(transform), sourceStream);
+    context.setOutputStream(context.getOutput(), sourceStream);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
index c8587d3..c7becad 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TransformTranslator.java
@@ -23,7 +23,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
- * translates {@link PTransform} to Gearpump functions.
+ * Translates {@link PTransform} to Gearpump functions.
  */
 public interface TransformTranslator<T extends PTransform> extends Serializable {
   void translate(T transform, TranslationContext context);

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index b2cff8a..e88bb74 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -20,17 +20,18 @@ package org.apache.beam.runners.gearpump.translators;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.collect.Iterables;
+
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
@@ -70,18 +71,26 @@ public class TranslationContext {
     }
   }
 
-  public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
-    return (InputT) getCurrentTransform(transform).getInput();
+  public List<TaggedPValue> getInputs() {
+    return getCurrentTransform().getInputs();
+  }
+
+  public PValue getInput() {
+    return Iterables.getOnlyElement(getInputs()).getValue();
+  }
+
+  public List<TaggedPValue> getOutputs() {
+    return getCurrentTransform().getOutputs();
   }
 
-  public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
-    return (OutputT) getCurrentTransform(transform).getOutput();
+  public PValue getOutput() {
+    return Iterables.getOnlyElement(getOutputs()).getValue();
   }
 
-  private AppliedPTransform<?, ?, ?> getCurrentTransform(PTransform<?, ?> transform) {
+  private AppliedPTransform<?, ?, ?> getCurrentTransform() {
     checkArgument(
-        currentTransform != null && currentTransform.getTransform() == transform,
-        "can only be called with current transform");
+        currentTransform != null,
+        "current transform not set");
     return currentTransform;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3eab6a64/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
new file mode 100644
index 0000000..fe6015a
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.joda.time.Instant;
+
+/**
+ * {@link Window.Bound} is translated to Gearpump flatMap function.
+ */
+@SuppressWarnings("unchecked")
+public class WindowAssignTranslator<T> implements  TransformTranslator<Window.Assign<T>> {
+
+  private static final long serialVersionUID = -964887482120489061L;
+
+  @Override
+  public void translate(Window.Assign<T> transform, TranslationContext context) {
+    PCollection<T> input = (PCollection<T>) context.getInput();
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    JavaStream<WindowedValue<T>> inputStream = context.getInputStream(input);
+    WindowingStrategy<?, ?> outputStrategy = output.getWindowingStrategy();
+    WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
+    JavaStream<WindowedValue<T>> outputStream =
+        inputStream
+            .flatMap(new AssignWindows(windowFn), "assign_windows");
+
+    context.setOutputStream(output, outputStream);
+  }
+
+  private static class AssignWindows<T> extends
+      FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
+
+    private static final long serialVersionUID = 7284565861938681360L;
+    private final WindowFn<T, BoundedWindow> windowFn;
+
+    AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
+      this.windowFn = windowFn;
+    }
+
+    @Override
+    public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
+      try {
+        Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
+          @Override
+          public T element() {
+            return value.getValue();
+          }
+
+          @Override
+          public Instant timestamp() {
+            return value.getTimestamp();
+          }
+
+          @Override
+          public BoundedWindow window() {
+            return Iterables.getOnlyElement(value.getWindows());
+          }
+        });
+        List<WindowedValue<T>> values = new ArrayList<>(windows.size());
+        for (BoundedWindow win: windows) {
+          values.add(
+              WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
+        }
+        return values.iterator();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}