You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:29 UTC

[04/50] [abbrv] beam git commit: [BEAM-79] Add SideInput support for GearpumpRunner

[BEAM-79] Add SideInput support for GearpumpRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4eb50d15
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4eb50d15
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4eb50d15

Branch: refs/heads/master
Commit: 4eb50d152b91df46bd7f0478650cb4abac3808c6
Parents: 2d0aed9
Author: manuzhang <ow...@gmail.com>
Authored: Tue Feb 14 12:33:31 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Mar 7 22:15:26 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  16 +-
 .../gearpump/GearpumpPipelineResult.java        |  47 +--
 .../gearpump/GearpumpPipelineTranslator.java    |   8 +
 .../beam/runners/gearpump/GearpumpRunner.java   | 373 ++++++++++++++++++-
 .../runners/gearpump/TestGearpumpRunner.java    |  39 +-
 ...CreateGearpumpPCollectionViewTranslator.java |  44 +++
 .../CreatePCollectionViewTranslator.java        |  43 +++
 .../translators/CreateValuesTranslator.java     |   2 +
 .../FlattenPCollectionTranslator.java           |  38 ++
 .../translators/GroupByKeyTranslator.java       |  82 ++--
 .../translators/ParDoBoundMultiTranslator.java  | 165 +++-----
 .../translators/ParDoBoundTranslator.java       |  32 +-
 .../translators/WindowBoundTranslator.java      |  21 +-
 .../translators/functions/DoFnFunction.java     | 158 ++++++--
 .../translators/io/BoundedSourceWrapper.java    |   1 +
 .../gearpump/translators/io/GearpumpSource.java |  23 +-
 .../gearpump/translators/io/ValuesSource.java   |  14 +-
 .../translators/utils/DoFnRunnerFactory.java    |  20 +-
 .../translators/utils/NoOpSideInputReader.java  |  48 ---
 .../translators/utils/TranslatorUtils.java      | 147 ++++++++
 20 files changed, 1002 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index 6f91c50..6a41dc0 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -75,23 +75,9 @@
                   <dependenciesToScan>
                     <dependency>org.apache.beam:beam-sdks-java-core</dependency>
                   </dependenciesToScan>
-                  <argLine>-noverify</argLine>
                   <excludes>
-                    <!-- side input is not supported in Gearpump -->
                     <exclude>
-                      org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOTest,
-                      org.apache.beam.sdk.io.CountingInputTest,
-                      org.apache.beam.sdk.io.CountingSourceTest,
-                      org.apache.beam.sdk.testing.PAssertTest,
-                      org.apache.beam.sdk.transforms.ApproximateUniqueTest,
-                      org.apache.beam.sdk.transforms.CombineTest,
-                      org.apache.beam.sdk.transforms.CombineFnsTest,
-                      org.apache.beam.sdk.transforms.CountTest,
-                      org.apache.beam.sdk.transforms.FlattenTest,
-                      org.apache.beam.sdk.transforms.ParDoTest,
-                      org.apache.beam.sdk.transforms.SampleTest,
-                      org.apache.beam.sdk.transforms.ViewTest,
-                      org.apache.beam.sdk.transforms.join.CoGroupByKeyTest
+                      org.apache.beam.sdk.transforms.ParDoTest
                     </exclude>
                   </excludes>
                   <systemPropertyVariables>

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
index a3740b7..8f90898 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineResult.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.gearpump.cluster.ApplicationStatus;
 import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
 import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.client.RunningApplication;
 import org.joda.time.Duration;
 
 import scala.collection.JavaConverters;
@@ -41,13 +42,11 @@ import scala.collection.Seq;
 public class GearpumpPipelineResult implements PipelineResult {
 
   private final ClientContext client;
-  private final int appId;
-  private final Duration defaultWaitDuration = Duration.standardSeconds(30);
-  private final Duration defaultWaitInterval = Duration.standardSeconds(5);
+  private final RunningApplication app;
 
-  public GearpumpPipelineResult(ClientContext client, int appId) {
+  public GearpumpPipelineResult(ClientContext client, RunningApplication app) {
     this.client = client;
-    this.appId = appId;
+    this.app = app;
   }
 
   @Override
@@ -57,38 +56,19 @@ public class GearpumpPipelineResult implements PipelineResult {
 
   @Override
   public State cancel() throws IOException {
-    client.shutdown(appId);
+    app.shutDown();
     return State.CANCELLED;
   }
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    long start = System.currentTimeMillis();
-    do {
-      try {
-        Thread.sleep(defaultWaitInterval.getMillis());
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        if (e instanceof RuntimeException) {
-          throw (RuntimeException) e;
-        }
-        throw new RuntimeException(e);
-      }
-    } while (State.RUNNING == getGearpumpState()
-        && (System.currentTimeMillis() - start) < duration.getMillis());
-
-    if (State.RUNNING == getGearpumpState()) {
-      return State.DONE;
-    } else {
-      return State.FAILED;
-    }
+    return waitUntilFinish();
   }
 
   @Override
   public State waitUntilFinish() {
-    return waitUntilFinish(defaultWaitDuration);
+    app.waitUntilFinish();
+    return State.DONE;
   }
 
   @Override
@@ -109,18 +89,19 @@ public class GearpumpPipelineResult implements PipelineResult {
     List<AppMasterData> apps =
         JavaConverters.<AppMasterData>seqAsJavaListConverter(
             (Seq<AppMasterData>) client.listApps().appMasters()).asJava();
-    for (AppMasterData app: apps) {
-      if (app.appId() == appId) {
-        status = app.status();
+    for (AppMasterData appData: apps) {
+      if (appData.appId() == app.appId()) {
+        status = appData.status();
       }
     }
     if (null == status || status instanceof ApplicationStatus.NONEXIST$) {
       return State.UNKNOWN;
     } else if (status instanceof ApplicationStatus.ACTIVE$) {
       return State.RUNNING;
+    } else if (status instanceof ApplicationStatus.SUCCEEDED$) {
+      return State.DONE;
     } else {
-      return State.STOPPED;
+      return State.FAILED;
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index 20624ed..4cc060c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -20,6 +20,9 @@ package org.apache.beam.runners.gearpump;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
+import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
 import org.apache.beam.runners.gearpump.translators.CreateValuesTranslator;
 import org.apache.beam.runners.gearpump.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
@@ -38,6 +41,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PValue;
 
@@ -75,6 +79,10 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
     registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
     registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
     registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
+    registerTransformTranslator(View.CreatePCollectionView.class,
+        new CreatePCollectionViewTranslator());
+    registerTransformTranslator(GearpumpRunner.CreateGearpumpPCollectionView.class,
+        new CreateGearpumpPCollectionViewTranslator<>());
   }
 
   public GearpumpPipelineTranslator(TranslationContext translationContext) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 9ca1eb2..72f2126 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -17,29 +17,45 @@
  */
 package org.apache.beam.runners.gearpump;
 
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValueFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
 import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.cluster.client.RunningApplication;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
 
@@ -56,8 +72,21 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
   private static final String GEARPUMP_SERIALIZERS = "gearpump.serializers";
   private static final String DEFAULT_APPNAME = "beam_gearpump_app";
 
+  /** Custom transforms implementations. */
+  private final Map<Class<?>, Class<?>> overrides;
+
   public GearpumpRunner(GearpumpPipelineOptions options) {
     this.options = options;
+
+    ImmutableMap.Builder<Class<?>, Class<?>> builder = ImmutableMap.builder();
+    builder.put(Combine.GloballyAsSingletonView.class,
+        StreamingCombineGloballyAsSingletonView.class);
+    builder.put(View.AsMap.class, StreamingViewAsMap.class);
+    builder.put(View.AsMultimap.class, StreamingViewAsMultimap.class);
+    builder.put(View.AsSingleton.class, StreamingViewAsSingleton.class);
+    builder.put(View.AsList.class, StreamingViewAsList.class);
+    builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
+    overrides = builder.build();
   }
 
   public static GearpumpRunner fromOptions(PipelineOptions options) {
@@ -69,15 +98,23 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
-    if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
+    if (overrides.containsKey(transform.getClass())) {
+
+      Class<PTransform<InputT, OutputT>> transformClass =
+          (Class<PTransform<InputT, OutputT>>) transform.getClass();
+
+      Class<PTransform<InputT, OutputT>> customTransformClass =
+          (Class<PTransform<InputT, OutputT>>) overrides.get(transform.getClass());
+
+      PTransform<InputT, OutputT> customTransform =
+          InstanceBuilder.ofType(customTransformClass)
+              .withArg(transformClass, transform)
+              .build();
+
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
             && ((PCollectionList<?>) input).size() == 0) {
       return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
-    } else if (Create.Values.class.equals(transform.getClass())) {
-      return (OutputT) PCollection
-              .<OutputT>createPrimitiveOutputInternal(
-                      input.getPipeline(),
-                      WindowingStrategy.globalDefault(),
-                      PCollection.IsBounded.BOUNDED);
     } else {
       return super.apply(transform, input);
     }
@@ -99,9 +136,9 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     TranslationContext translationContext = new TranslationContext(streamApp, options);
     GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
     translator.translate(pipeline);
-    int appId = streamApp.submit().appId();
+    RunningApplication app = streamApp.submit();
 
-    return new GearpumpPipelineResult(clientContext, appId);
+    return new GearpumpPipelineResult(clientContext, app);
   }
 
   private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
@@ -131,4 +168,320 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
     return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
   }
 
+
+
+  // The following codes are forked from DataflowRunner for View translator
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+   * for the Gearpump runner.
+   */
+  private static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private static final long serialVersionUID = 4791080760092950304L;
+
+    public StreamingViewAsMap(View.AsMap<K, V> transform) {}
+
+    @Override
+    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // throw new RuntimeException(e);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+    private static final long serialVersionUID = 5854899081751333352L;
+
+    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        // throw new RuntimeException(e);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+
+    private static final long serialVersionUID = -3399860618995613421L;
+
+    public StreamingViewAsIterable(View.AsIterable<T> transform) {}
+
+    @Override
+    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+   * Gearpump runner.
+   */
+  private static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+
+    private static final long serialVersionUID = -5018631473886330629L;
+
+    public StreamingViewAsList(View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+          .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+
+    private static final long serialVersionUID = 9064900748869035738L;
+    private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    public StreamingCombineGloballyAsSingletonView(
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      PCollection<OutputT> combined =
+          input.apply(Combine.globally(transform.getCombineFn())
+              .withoutDefaults()
+              .withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = PCollectionViews.singletonView(
+          combined.getPipeline(),
+          combined.getWindowingStrategy(),
+          transform.getInsertDefault(),
+          transform.getInsertDefault()
+              ? transform.getCombineFn().defaultValue() : null,
+          combined.getCoder());
+      return combined
+          .apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class StreamingViewAsSingleton<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
+
+    private static final long serialVersionUID = 5870455965625071546L;
+    private final View.AsSingleton<T> transform;
+
+    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> expand(PCollection<T> input) {
+      Combine.Globally<T, T> combine = Combine.globally(
+          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right) {
+        throw new IllegalArgumentException("PCollection with more than one element "
+            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity() {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException(
+              "Empty PCollection accessed as a singleton view. "
+                  + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  private static class WrapAsList<T> extends DoFn<T, List<T>> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(Collections.singletonList(c.element()));
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private static final long serialVersionUID = -2637073020800540542L;
+    private PCollectionView<ViewT> view;
+
+    private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateGearpumpPCollectionView<>(view);
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+
+    @Override
+    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
index ee31fb5..c96bcb1 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/TestGearpumpRunner.java
@@ -18,8 +18,9 @@
 
 package org.apache.beam.runners.gearpump;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -27,7 +28,9 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 
+import org.apache.gearpump.cluster.ClusterConfig;
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster;
+import org.apache.gearpump.util.Constants;
 
 /**
  * Gearpump {@link PipelineRunner} for tests, which uses {@link EmbeddedCluster}.
@@ -38,7 +41,10 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
   private final EmbeddedCluster cluster;
 
   private TestGearpumpRunner(GearpumpPipelineOptions options) {
-    cluster = EmbeddedCluster.apply();
+    Config config = ClusterConfig.master(null);
+    config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
+      ConfigValueFactory.fromAnyRef(0));
+    cluster = new EmbeddedCluster(config);
     cluster.start();
     options.setEmbeddedCluster(cluster);
     delegate = GearpumpRunner.fromOptions(options);
@@ -52,12 +58,31 @@ public class TestGearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
 
   @Override
   public GearpumpPipelineResult run(Pipeline pipeline) {
-    GearpumpPipelineResult result = delegate.run(pipeline);
-    PipelineResult.State state = result.waitUntilFinish();
-    cluster.stop();
-    assert(state == PipelineResult.State.DONE);
+    try {
+      GearpumpPipelineResult result = delegate.run(pipeline);
+      result.waitUntilFinish();
+      cluster.stop();
+      return result;
+    } catch (Throwable e) {
+      // copied from TestFlinkRunner to pull out AssertionError
+      // which is wrapped in UserCodeException
+      Throwable cause = e;
+      Throwable oldCause;
+      do {
+        if (cause.getCause() == null) {
+          break;
+        }
 
-    return result;
+        oldCause = cause;
+        cause = cause.getCause();
+
+      } while (!oldCause.equals(cause));
+      if (cause instanceof AssertionError) {
+        throw (AssertionError) cause;
+      } else {
+        throw e;
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
new file mode 100644
index 0000000..d05c89d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.GearpumpRunner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+
+/**
+ * CreateGearpumpPCollectionView bridges input stream to down stream
+ * transforms.
+ */
+public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
+    TransformTranslator<GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+
+  @Override
+  public void translate(GearpumpRunner.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+      TranslationContext context) {
+    JavaStream<WindowedValue<List<ElemT>>> inputStream =
+        context.getInputStream(context.getInput(transform));
+    PCollectionView<ViewT> view = transform.getView();
+    context.setOutputStream(view, inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
new file mode 100644
index 0000000..e9e2e5d
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import java.util.List;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+
+/**
+ * View.CreatePCollectionView bridges input stream to down stream
+ * transforms.
+ */
+public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
+    TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
+
+  @Override
+  public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
+                        TranslationContext context) {
+    JavaStream<WindowedValue<List<ElemT>>> inputStream =
+        context.getInputStream(context.getInput(transform));
+    PCollectionView<ViewT> view = transform.getView();
+    context.setOutputStream(view, inputStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
index 452127a..e5dc6dd 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateValuesTranslator.java
@@ -33,6 +33,8 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
  */
 public class CreateValuesTranslator<T> implements TransformTranslator<Create.Values<T>> {
 
+  private static final long serialVersionUID = 5411841848199229738L;
+
   @Override
   public void translate(Create.Values<T> transform, TranslationContext context) {
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
index b740ab5..27e54b8 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionTranslator.java
@@ -18,11 +18,22 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+import com.google.common.collect.Lists;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.runners.gearpump.translators.io.ValuesSource;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
+
+
 /**
  * Flatten.FlattenPCollectionList is translated to Gearpump merge function.
  * Note only two-way merge is working now
@@ -30,17 +41,44 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 public class FlattenPCollectionTranslator<T> implements
     TransformTranslator<Flatten.FlattenPCollectionList<T>> {
 
+  private static final long serialVersionUID = -5552148802472944759L;
+
   @Override
   public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
     JavaStream<T> merged = null;
+    Set<PCollection<T>> unique = new HashSet<>();
     for (PCollection<T> collection : context.getInput(transform).getAll()) {
+      unique.add(collection);
       JavaStream<T> inputStream = context.getInputStream(collection);
       if (null == merged) {
         merged = inputStream;
       } else {
+        // duplicate edges are not allowed in Gearpump graph
+        // so we route through a dummy node
+        if (unique.contains(collection)) {
+          inputStream = inputStream.map(new DummyFunction<T>(), "dummy");
+        }
+
         merged = merged.merge(inputStream, transform.getName());
       }
     }
+
+    if (null == merged) {
+      UnboundedSourceWrapper<String, ?> unboundedSourceWrapper = new UnboundedSourceWrapper<>(
+          new ValuesSource<>(Lists.newArrayList("dummy"),
+              StringUtf8Coder.of()), context.getPipelineOptions());
+      merged = context.getSourceStream(unboundedSourceWrapper);
+    }
     context.setOutputStream(context.getOutput(transform), merged);
   }
+
+  private static class DummyFunction<T> extends MapFunction<T, T> {
+
+    private static final long serialVersionUID = 5454396869997290471L;
+
+    @Override
+    public T map(T t) {
+      return t;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 69a1d11..df8bfe9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -40,8 +40,8 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction;
 import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
@@ -49,12 +49,16 @@ import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
 import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
 import org.apache.gearpump.streaming.dsl.window.api.Windows;
 import org.apache.gearpump.streaming.dsl.window.impl.Window;
+import org.joda.time.Instant;
 
 /**
  * {@link GroupByKey} is translated to Gearpump groupBy function.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
+
+  private static final long serialVersionUID = -8742202583992787659L;
+
   @Override
   public void translate(GroupByKey<K, V> transform, TranslationContext context) {
     PCollection<KV<K, V>> input = context.getInput(transform);
@@ -66,15 +70,14 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
         input.getWindowingStrategy().getOutputTimeFn();
     WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>)
         input.getWindowingStrategy().getWindowFn();
-    JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
+    JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
         .window(Windows.apply(
             new GearpumpWindowFn(windowFn.isNonMerging()),
             EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
         .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
-        .map(new ValueToIterable<K, V>(), "map_value_to_iterable")
         .map(new KeyedByTimestamp<K, V>((OutputTimeFn<? super BoundedWindow>)
             input.getWindowingStrategy().getOutputTimeFn()), "keyed_by_timestamp")
-        .reduce(new Merge<>(windowFn, outputTimeFn), "merge")
+        .fold(new Merge<>(windowFn, outputTimeFn), "merge")
         .map(new Values<K, V>(), "values");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
@@ -115,6 +118,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
   private static class GroupByFn<K, V> extends
       GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
 
+    private static final long serialVersionUID = -807905402490735530L;
     private final Coder<K> keyCoder;
 
     GroupByFn(Coder<K> keyCoder) {
@@ -122,7 +126,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
 
     @Override
-    public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
+    public ByteBuffer groupBy(WindowedValue<KV<K, V>> wv) {
       try {
         return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
       } catch (CoderException e) {
@@ -131,19 +135,9 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class ValueToIterable<K, V>
-      extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {
-
-    @Override
-    public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
-      Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());
-      return wv.withValue(KV.of(wv.getValue().getKey(), values));
-    }
-  }
-
   private static class KeyedByTimestamp<K, V>
-      extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
-      KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+      extends MapFunction<WindowedValue<KV<K, V>>,
+      KV<Instant, WindowedValue<KV<K, V>>>> {
 
     private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
 
@@ -152,16 +146,17 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
 
     @Override
-    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
-        WindowedValue<KV<K, Iterable<V>>> wv) {
-      org.joda.time.Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
+    public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
+        WindowedValue<KV<K, V>> wv) {
+      Instant timestamp = outputTimeFn.assignOutputTime(wv.getTimestamp(),
           Iterables.getOnlyElement(wv.getWindows()));
       return KV.of(timestamp, wv);
     }
   }
 
   private static class Merge<K, V> extends
-      ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {
+      FoldFunction<KV<Instant, WindowedValue<KV<K, V>>>,
+      KV<Instant, WindowedValue<KV<K, List<V>>>>> {
 
     private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
     private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
@@ -173,14 +168,28 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
 
     @Override
-    public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
-        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
-        KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
-      org.joda.time.Instant t1 = kv1.getKey();
-      org.joda.time.Instant t2 = kv2.getKey();
+    public KV<Instant, WindowedValue<KV<K, List<V>>>> init() {
+      return KV.of(null, null);
+    }
+
+    @Override
+    public KV<Instant, WindowedValue<KV<K, List<V>>>> fold(
+        KV<Instant, WindowedValue<KV<K, List<V>>>> accum,
+        KV<Instant, WindowedValue<KV<K, V>>> iter) {
+      if (accum.getKey() == null) {
+        WindowedValue<KV<K, V>> wv = iter.getValue();
+        KV<K, V> kv = wv.getValue();
+        V v = kv.getValue();
+        List<V> nv = Lists.newArrayList(v);
+        return KV.of(iter.getKey(), wv.withValue(KV.of(kv.getKey(), nv)));
+      }
+
+      Instant t1 = accum.getKey();
+      Instant t2 = iter.getKey();
 
-      final WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
-      final WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();
+      final WindowedValue<KV<K, List<V>>> wv1 = accum.getValue();
+      final WindowedValue<KV<K, V>> wv2 = iter.getValue();
+      wv1.getValue().getValue().add(wv2.getValue().getValue());
 
       final List<BoundedWindow> mergedWindows = new ArrayList<>();
       if (!windowFn.isNonMerging()) {
@@ -208,23 +217,22 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
         mergedWindows.addAll(wv1.getWindows());
       }
 
-      org.joda.time.Instant timestamp = outputTimeFn.combine(t1, t2);
+      Instant timestamp = outputTimeFn.combine(t1, t2);
       return KV.of(timestamp,
-          WindowedValue.of(KV.of(wv1.getValue().getKey(),
-              Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())), timestamp,
+          WindowedValue.of(wv1.getValue(), timestamp,
               mergedWindows, wv1.getPane()));
     }
   }
 
   private static class Values<K, V> extends
-      MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
-          WindowedValue<KV<K, Iterable<V>>>> {
+      MapFunction<KV<Instant, WindowedValue<KV<K, List<V>>>>,
+          WindowedValue<KV<K, List<V>>>> {
 
     @Override
-    public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
-        WindowedValue<KV<K, Iterable<V>>>> kv) {
-      org.joda.time.Instant timestamp = kv.getKey();
-      WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
+    public WindowedValue<KV<K, List<V>>> map(KV<org.joda.time.Instant,
+        WindowedValue<KV<K, List<V>>>> kv) {
+      Instant timestamp = kv.getKey();
+      WindowedValue<KV<K, List<V>>> wv = kv.getValue();
       return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
index bf7073b..8c57019 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundMultiTranslator.java
@@ -18,158 +18,93 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+
+import com.google.common.base.Predicate;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
-import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import javax.annotation.Nullable;
+
+import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 
 import org.apache.gearpump.streaming.dsl.api.functions.FilterFunction;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * {@link ParDo.BoundMulti} is translated to Gearpump flatMap function
- * with {@link DoFn} wrapped in {@link DoFnMultiFunction}. The outputs are
+ * with {@link DoFn} wrapped in {@link DoFnFunction}. The outputs are
  * further filtered with Gearpump filter function by output tag
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class ParDoBoundMultiTranslator<InputT, OutputT> implements
     TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> {
 
+  private static final long serialVersionUID = -6023461558200028849L;
+
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
     PCollection<InputT> inputT = (PCollection<InputT>) context.getInput(transform);
     JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(inputT);
-    Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+    Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Map<String, PCollectionView<?>> tagsToSideInputs =
+        TranslatorUtils.getTagsToSideInputs(sideInputs);
 
-    JavaStream<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputStream = inputStream.flatMap(
-        new DoFnMultiFunction<>(
-            context.getPipelineOptions(),
-            transform.getFn(),
-            transform.getMainOutputTag(),
-            transform.getSideOutputTags(),
-            inputT.getWindowingStrategy(),
-            new NoOpSideInputReader()
-        ), transform.getName());
-    for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) {
+    Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll();
+    final TupleTag<OutputT> mainOutput = transform.getMainOutputTag();
+    List<TupleTag<?>> sideOutputs = Lists.newLinkedList(Sets.filter(outputs.keySet(),
+        new Predicate<TupleTag<?>>() {
+          @Override
+          public boolean apply(@Nullable TupleTag<?> tupleTag) {
+            return tupleTag != null && !tupleTag.getId().equals(mainOutput.getId());
+          }
+        }));
+
+    JavaStream<TranslatorUtils.RawUnionValue> unionStream = TranslatorUtils.withSideInputStream(
+        context, inputStream, tagsToSideInputs);
+
+    JavaStream<TranslatorUtils.RawUnionValue> outputStream =
+        TranslatorUtils.toList(unionStream).flatMap(
+            new DoFnFunction<>(
+                context.getPipelineOptions(),
+                transform.getFn(),
+                inputT.getWindowingStrategy(),
+                sideInputs,
+                tagsToSideInputs,
+                mainOutput,
+                sideOutputs), transform.getName());
+    for (Map.Entry<TupleTag<?>, PCollection<?>> output: outputs.entrySet()) {
+      output.getValue().getCoder();
       JavaStream<WindowedValue<OutputT>> taggedStream = outputStream
-          .filter(new FilterByOutputTag<>((TupleTag<OutputT>) output.getKey())
-              , "filter_by_output_tag")
-          .map(new ExtractOutput<OutputT>(), "extract output");
-
+          .filter(new FilterByOutputTag(output.getKey().getId()),
+              "filter_by_output_tag")
+          .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
       context.setOutputStream(output.getValue(), taggedStream);
     }
   }
 
-  /**
-   * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFnMultiFunction}.
-   */
-  private static class DoFnMultiFunction<InputT, OutputT>
-    extends FlatMapFunction<WindowedValue<InputT>, WindowedValue<KV<TupleTag<OutputT>, OutputT>>>
-    implements DoFnRunners.OutputManager {
-
-    private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
-    private DoFnRunner<InputT, OutputT> doFnRunner;
-    private final DoFn<InputT, OutputT> doFn;
-    private List<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> outputs;
-
-    public DoFnMultiFunction(
-        GearpumpPipelineOptions pipelineOptions,
-        DoFn<InputT, OutputT> doFn,
-        TupleTag<OutputT> mainOutputTag,
-        TupleTagList sideOutputTags,
-        WindowingStrategy<?, ?> windowingStrategy,
-        SideInputReader sideInputReader) {
-      this.doFn = doFn;
-      this.doFnRunnerFactory = new DoFnRunnerFactory<>(
-          pipelineOptions,
-          doFn,
-          sideInputReader,
-          this,
-          mainOutputTag,
-          sideOutputTags.getAll(),
-          new NoOpStepContext(),
-          new NoOpAggregatorFactory(),
-          windowingStrategy
-      );
-    }
-
-    @Override
-    public void setup() {
-      DoFnInvokers.invokerFor(doFn).invokeSetup();
-    }
-
-    @Override
-    public void teardown() {
-      DoFnInvokers.invokerFor(doFn).invokeTeardown();
-    }
-
-    @Override
-    public Iterator<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> apply(WindowedValue<InputT> wv) {
-      outputs = Lists.newArrayList();
-
-      if (null == doFnRunner) {
-        doFnRunner = doFnRunnerFactory.createRunner();
-      }
-      doFnRunner.startBundle();
-      doFnRunner.processElement(wv);
-      doFnRunner.finishBundle();
-
-      return outputs.iterator();
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      KV<TupleTag<OutputT>, OutputT> kv = KV.of((TupleTag<OutputT>) tag,
-          (OutputT) output.getValue());
-      outputs.add(WindowedValue.of(kv, output.getTimestamp(),
-          output.getWindows(), output.getPane()));
-    }
-  }
-
-  private static class FilterByOutputTag<OutputT> extends
-      FilterFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>> {
+  private static class FilterByOutputTag extends FilterFunction<TranslatorUtils.RawUnionValue> {
 
-    private final TupleTag<OutputT> tupleTag;
+    private static final long serialVersionUID = 7276155265895637526L;
+    private final String tag;
 
-    public FilterByOutputTag(TupleTag<OutputT> tupleTag) {
-      this.tupleTag = tupleTag;
+    FilterByOutputTag(String tag) {
+      this.tag = tag;
     }
 
     @Override
-    public boolean apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
-      return wv.getValue().getKey().equals(tupleTag);
-    }
-  }
-
-  private static class ExtractOutput<OutputT> extends
-      MapFunction<WindowedValue<KV<TupleTag<OutputT>, OutputT>>, WindowedValue<OutputT>> {
-
-    @Override
-    public WindowedValue<OutputT> apply(WindowedValue<KV<TupleTag<OutputT>, OutputT>> wv) {
-      // System.out.println(wv.getValue().getKey() + ":" + wv.getValue().getValue());
-      return WindowedValue.of(wv.getValue().getValue(), wv.getTimestamp(),
-          wv.getWindows(), wv.getPane());
+    public boolean filter(TranslatorUtils.RawUnionValue value) {
+      return value.getUnionTag().equals(tag);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
index 689bc08..efae938 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoBoundTranslator.java
@@ -18,14 +18,21 @@
 
 package org.apache.beam.runners.gearpump.translators;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
-import org.apache.beam.runners.gearpump.translators.utils.NoOpSideInputReader;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 
 
@@ -36,18 +43,33 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 public class ParDoBoundTranslator<InputT, OutputT> implements
     TransformTranslator<ParDo.Bound<InputT, OutputT>> {
 
+  private static final long serialVersionUID = -3413205558160983784L;
+  private final TupleTag<OutputT> mainOutput = new TupleTag<>();
+  private final List<TupleTag<?>> sideOutputs = TupleTagList.empty().getAll();
+
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
     DoFn<InputT, OutputT> doFn = transform.getFn();
     PCollection<OutputT> output = context.getOutput(transform);
     WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
 
+    Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
+    Map<String, PCollectionView<?>> tagsToSideInputs =
+        TranslatorUtils.getTagsToSideInputs(sideInputs);
+    JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
+        context.getInput(transform));
+    JavaStream<TranslatorUtils.RawUnionValue> unionStream =
+        TranslatorUtils.withSideInputStream(context,
+        inputStream, tagsToSideInputs);
+
     DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
-        doFn, windowingStrategy, new NoOpSideInputReader());
-    JavaStream<WindowedValue<InputT>> inputStream =
-        context.getInputStream(context.getInput(transform));
+        doFn, windowingStrategy, sideInputs, tagsToSideInputs,
+        mainOutput, sideOutputs);
+
     JavaStream<WindowedValue<OutputT>> outputStream =
-        inputStream.flatMap(doFnFunction, transform.getName());
+        TranslatorUtils.toList(unionStream)
+            .flatMap(doFnFunction, transform.getName())
+            .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
index c0de2df..81970e2 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowBoundTranslator.java
@@ -20,7 +20,10 @@ package org.apache.beam.runners.gearpump.translators;
 
 import com.google.common.collect.Iterables;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -28,8 +31,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 import org.joda.time.Instant;
 
 /**
@@ -38,6 +41,8 @@ import org.joda.time.Instant;
 @SuppressWarnings("unchecked")
 public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bound<T>> {
 
+  private static final long serialVersionUID = -964887482120489061L;
+
   @Override
   public void translate(Window.Bound<T> transform, TranslationContext context) {
     PCollection<T> input = context.getInput(transform);
@@ -47,14 +52,15 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
     WindowFn<T, BoundedWindow> windowFn = (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
     JavaStream<WindowedValue<T>> outputStream =
         inputStream
-            .map(new AssignWindows(windowFn), "assign_windows");
+            .flatMap(new AssignWindows(windowFn), "assign_windows");
 
     context.setOutputStream(context.getOutput(transform), outputStream);
   }
 
   private static class AssignWindows<T> extends
-      MapFunction<WindowedValue<T>, WindowedValue<T>> {
+      FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
+    private static final long serialVersionUID = 7284565861938681360L;
     private final WindowFn<T, BoundedWindow> windowFn;
 
     AssignWindows(WindowFn<T, BoundedWindow> windowFn) {
@@ -62,7 +68,7 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
     }
 
     @Override
-    public WindowedValue<T> apply(final WindowedValue<T> value) {
+    public Iterator<WindowedValue<T>> flatMap(final WindowedValue<T> value) {
       try {
         Collection<BoundedWindow> windows = windowFn.assignWindows(windowFn.new AssignContext() {
           @Override
@@ -80,7 +86,12 @@ public class WindowBoundTranslator<T> implements  TransformTranslator<Window.Bou
             return Iterables.getOnlyElement(value.getWindows());
           }
         });
-        return WindowedValue.of(value.getValue(), value.getTimestamp(), windows, value.getPane());
+        List<WindowedValue<T>> values = new ArrayList<>(windows.size());
+        for (BoundedWindow win: windows) {
+          values.add(
+              WindowedValue.of(value.getValue(), value.getTimestamp(), win, value.getPane()));
+        }
+        return values.iterator();
       } catch (Exception e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index a66d3a4..b2c68d6 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -18,90 +18,190 @@
 
 package org.apache.beam.runners.gearpump.translators.functions;
 
+import com.google.common.collect.Iterables;
+
 import com.google.common.collect.Lists;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.runners.gearpump.translators.utils.DoFnRunnerFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpAggregatorFactory;
 import org.apache.beam.runners.gearpump.translators.utils.NoOpStepContext;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils.RawUnionValue;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
 
 /**
  * Gearpump {@link FlatMapFunction} wrapper over Beam {@link DoFn}.
  */
+@SuppressWarnings("unchecked")
 public class DoFnFunction<InputT, OutputT> extends
-    FlatMapFunction<WindowedValue<InputT>, WindowedValue<OutputT>> implements
-    DoFnRunners.OutputManager {
+    FlatMapFunction<List<RawUnionValue>, RawUnionValue> {
 
-  private final TupleTag<OutputT> mainTag = new TupleTag<OutputT>() {};
-  private List<WindowedValue<OutputT>> outputs = Lists.newArrayList();
+  private static final long serialVersionUID = -5701440128544343353L;
   private final DoFnRunnerFactory<InputT, OutputT> doFnRunnerFactory;
-  private DoFnRunner<InputT, OutputT> doFnRunner;
   private final DoFn<InputT, OutputT> doFn;
+  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private transient PushbackSideInputDoFnRunner<InputT, OutputT> doFnRunner;
+  private transient SideInputHandler sideInputReader;
+  private transient List<WindowedValue<InputT>> pushedBackValues;
+  private transient Map<PCollectionView<?>, List<WindowedValue<Iterable<?>>>> sideInputValues;
+  private final Collection<PCollectionView<?>> sideInputs;
+  private final Map<String, PCollectionView<?>> tagsToSideInputs;
+  private final TupleTag<OutputT> mainOutput;
+  private final List<TupleTag<?>> sideOutputs;
+  private final DoFnOutputManager outputManager;
 
   public DoFnFunction(
       GearpumpPipelineOptions pipelineOptions,
       DoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
-      SideInputReader sideInputReader) {
+      Collection<PCollectionView<?>> sideInputs,
+      Map<String, PCollectionView<?>> sideInputTagMapping,
+      TupleTag<OutputT> mainOutput,
+      List<TupleTag<?>> sideOutputs) {
     this.doFn = doFn;
+    this.outputManager = new DoFnOutputManager();
     this.doFnRunnerFactory = new DoFnRunnerFactory<>(
         pipelineOptions,
         doFn,
-        sideInputReader,
-        this,
-        mainTag,
-        TupleTagList.empty().getAll(),
+        sideInputs,
+        outputManager,
+        mainOutput,
+        sideOutputs,
         new NoOpStepContext(),
         new NoOpAggregatorFactory(),
         windowingStrategy
     );
+    this.sideInputs = sideInputs;
+    this.tagsToSideInputs = sideInputTagMapping;
+    this.mainOutput = mainOutput;
+    this.sideOutputs = sideOutputs;
   }
 
   @Override
   public void setup() {
-    DoFnInvokers.invokerFor(doFn).invokeSetup();
+    sideInputReader = new SideInputHandler(sideInputs,
+        InMemoryStateInternals.<Void>forKey(null));
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+
+    doFnRunner = doFnRunnerFactory.createRunner(sideInputReader);
+
+    pushedBackValues = new LinkedList<>();
+    sideInputValues = new HashMap<>();
+    outputManager.setup(mainOutput, sideOutputs);
   }
 
   @Override
   public void teardown() {
-    DoFnInvokers.invokerFor(doFn).invokeTeardown();
+    doFnInvoker.invokeTeardown();
   }
 
   @Override
-  public Iterator<WindowedValue<OutputT>> apply(WindowedValue<InputT> value) {
-    outputs = Lists.newArrayList();
+  public Iterator<TranslatorUtils.RawUnionValue> flatMap(List<RawUnionValue> inputs) {
+    outputManager.clear();
+
+    doFnRunner.startBundle();
 
-    if (null == doFnRunner) {
-      doFnRunner = doFnRunnerFactory.createRunner();
+    for (RawUnionValue unionValue: inputs) {
+      final String tag = unionValue.getUnionTag();
+      if (tag.equals("0")) {
+        // main input
+        pushedBackValues.add((WindowedValue<InputT>) unionValue.getValue());
+      } else {
+        // side input
+        PCollectionView<?> sideInput = tagsToSideInputs.get(unionValue.getUnionTag());
+        WindowedValue<Iterable<?>> sideInputValue =
+            (WindowedValue<Iterable<?>>) unionValue.getValue();
+        if (!sideInputValues.containsKey(sideInput)) {
+          sideInputValues.put(sideInput, new LinkedList<WindowedValue<Iterable<?>>>());
+        }
+        sideInputValues.get(sideInput).add(sideInputValue);
+      }
     }
 
-    doFnRunner.startBundle();
-    doFnRunner.processElement(value);
+    for (PCollectionView<?> sideInput: sideInputs) {
+      if (sideInputValues.containsKey(sideInput)) {
+        for (WindowedValue<Iterable<?>> value: sideInputValues.get(sideInput)) {
+          sideInputReader.addSideInputValue(sideInput, value);
+        }
+      }
+      for (WindowedValue<InputT> value : pushedBackValues) {
+        for (BoundedWindow win: value.getWindows()) {
+          BoundedWindow sideInputWindow =
+              sideInput.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(win);
+          if (!sideInputReader.isReady(sideInput, sideInputWindow)) {
+            Object emptyValue = WindowedValue.of(
+                Lists.newArrayList(), value.getTimestamp(), sideInputWindow, value.getPane());
+            sideInputReader.addSideInputValue(sideInput, (WindowedValue<Iterable<?>>) emptyValue);
+          }
+        }
+      }
+    }
+
+    List<WindowedValue<InputT>> nextPushedBackValues = new LinkedList<>();
+    for (WindowedValue<InputT> value : pushedBackValues) {
+      Iterable<WindowedValue<InputT>> values = doFnRunner.processElementInReadyWindows(value);
+      Iterables.addAll(nextPushedBackValues, values);
+    }
+    pushedBackValues.clear();
+    Iterables.addAll(pushedBackValues, nextPushedBackValues);
+    sideInputValues.clear();
+
     doFnRunner.finishBundle();
 
-    return outputs.iterator();
+    return outputManager.getOutputs();
   }
 
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  @Override
-  public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-    if (mainTag.equals(tag)) {
-      outputs.add((WindowedValue<OutputT>) output);
-    } else {
-      throw new RuntimeException("output is not of main tag");
+  private static class DoFnOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+    private static final long serialVersionUID = 4967375172737408160L;
+    private transient List<RawUnionValue> outputs;
+    private transient Set<TupleTag<?>> outputTags;
+
+    @Override
+    public <T> void output(TupleTag<T> outputTag, WindowedValue<T> output) {
+      if (outputTags.contains(outputTag)) {
+        outputs.add(new RawUnionValue(outputTag.getId(), output));
+      }
+    }
+
+    void setup(TupleTag<?> mainOutput, List<TupleTag<?>> sideOutputs) {
+      outputs = new LinkedList<>();
+      outputTags = new HashSet<>();
+      outputTags.add(mainOutput);
+      outputTags.addAll(sideOutputs);
+    }
+
+    void clear() {
+      outputs.clear();
+    }
+
+    Iterator<RawUnionValue> getOutputs() {
+      return outputs.iterator();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
index f889101..2c18735 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/BoundedSourceWrapper.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
  */
 public class BoundedSourceWrapper<T> extends GearpumpSource<T> {
 
+  private static final long serialVersionUID = 8199570485738786123L;
   private final BoundedSource<T> source;
 
   public BoundedSourceWrapper(BoundedSource<T> source, PipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 3d0d7c8..c079603 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -28,10 +28,13 @@ import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
+// import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.gearpump.Message;
 import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.source.Watermark;
 import org.apache.gearpump.streaming.task.TaskContext;
 
 /**
@@ -74,11 +77,11 @@ public abstract class GearpumpSource<T> implements DataSource {
       if (available) {
         T data = reader.getCurrent();
         org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
-        available = reader.advance();
         message = Message.apply(
-            WindowedValue.valueInGlobalWindow(data),
+            WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
             timestamp.getMillis());
       }
+      available = reader.advance();
     } catch (Exception e) {
       close();
       throw new RuntimeException(e);
@@ -100,11 +103,19 @@ public abstract class GearpumpSource<T> implements DataSource {
   @Override
   public Instant getWatermark() {
     if (reader instanceof UnboundedSource.UnboundedReader) {
-      return TranslatorUtils.jodaTimeToJava8Time(
-          ((UnboundedSource.UnboundedReader) reader).getWatermark());
+      org.joda.time.Instant watermark =
+          ((UnboundedSource.UnboundedReader) reader).getWatermark();
+      if (watermark == BoundedWindow.TIMESTAMP_MAX_VALUE) {
+        return Watermark.MAX();
+      } else {
+        return TranslatorUtils.jodaTimeToJava8Time(watermark);
+      }
     } else {
-      return Instant.now();
+      if (available) {
+        return TranslatorUtils.jodaTimeToJava8Time(reader.getCurrentTimestamp());
+      } else {
+        return Watermark.MAX();
+      }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
index f5a5eb4..e0488cd 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/ValuesSource.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Instant;
 
 /**
@@ -40,6 +41,7 @@ import org.joda.time.Instant;
  */
 public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.CheckpointMark> {
 
+  private static final long serialVersionUID = 9113026175795235710L;
   private final byte[] values;
   private final IterableCoder<T> iterableCoder;
 
@@ -135,7 +137,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
 
     @Override
     public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return Instant.now();
+      return getTimestamp(current);
     }
 
     @Override
@@ -145,7 +147,7 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
     @Override
     public Instant getWatermark() {
       if (iterator.hasNext()) {
-        return Instant.now();
+        return getTimestamp(current);
       } else {
         return BoundedWindow.TIMESTAMP_MAX_VALUE;
       }
@@ -160,5 +162,13 @@ public class ValuesSource<T> extends UnboundedSource<T, UnboundedSource.Checkpoi
     public UnboundedSource<T, ?> getCurrentSource() {
       return source;
     }
+
+    private Instant getTimestamp(Object value) {
+      if (value instanceof TimestampedValue) {
+        return ((TimestampedValue) value).getTimestamp();
+      } else {
+        return Instant.now();
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index aaefb88..5db8320 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -19,18 +19,21 @@
 package org.apache.beam.runners.gearpump.translators.utils;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
@@ -38,10 +41,10 @@ import org.apache.beam.sdk.values.TupleTag;
  */
 public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
 
-  private static final long serialVersionUID = 1083167395296383469L;
+  private static final long serialVersionUID = -4109539010014189725L;
   private final DoFn<InputT, OutputT> fn;
   private final transient PipelineOptions options;
-  private final SideInputReader sideInputReader;
+  private final Collection<PCollectionView<?>> sideInputs;
   private final DoFnRunners.OutputManager outputManager;
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
@@ -52,7 +55,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
   public DoFnRunnerFactory(
       GearpumpPipelineOptions pipelineOptions,
       DoFn<InputT, OutputT> doFn,
-      SideInputReader sideInputReader,
+      Collection<PCollectionView<?>> sideInputs,
       DoFnRunners.OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -61,7 +64,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
     this.options = pipelineOptions;
-    this.sideInputReader = sideInputReader;
+    this.sideInputs = sideInputs;
     this.outputManager = outputManager;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
@@ -70,9 +73,12 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
     this.windowingStrategy = windowingStrategy;
   }
 
-  public DoFnRunner<InputT, OutputT> createRunner() {
-    return DoFnRunners.createDefault(options, fn, sideInputReader, outputManager, mainOutputTag,
+  public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner(
+      ReadyCheckingSideInputReader sideInputReader) {
+    DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault(
+        options, fn, sideInputReader, outputManager, mainOutputTag,
         sideOutputTags, stepContext, aggregatorFactory, windowingStrategy);
+    return PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4eb50d15/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
deleted file mode 100644
index d1a9198..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpSideInputReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators.utils;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * no-op side input reader.
- */
-public class NoOpSideInputReader implements SideInputReader, Serializable {
-  @Nullable
-  @Override
-  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    return null;
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    return false;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return false;
-  }
-}