You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:57 UTC

[32/50] [abbrv] beam git commit: Fix PCollectionView translation

Fix PCollectionView translation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a7b5d981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a7b5d981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a7b5d981

Branch: refs/heads/master
Commit: a7b5d981a57bae38a5ef70feb0c6bd167ce22ab0
Parents: f61822d
Author: manuzhang <ow...@gmail.com>
Authored: Sat Jun 17 11:11:07 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 19 16:03:19 2017 +0800

----------------------------------------------------------------------
 .../gearpump/GearpumpPipelineTranslator.java    | 526 -------------------
 .../beam/runners/gearpump/GearpumpRunner.java   |   1 +
 ...CreateGearpumpPCollectionViewTranslator.java |   9 +-
 .../CreateStreamingGearpumpView.java            | 157 ++++++
 .../translators/GearpumpPipelineTranslator.java | 143 +++++
 .../translators/TranslationContext.java         |   3 +-
 .../translators/functions/DoFnFunction.java     |   1 -
 .../gearpump/translators/io/GearpumpSource.java |   2 +-
 .../translators/utils/TranslatorUtils.java      |   1 +
 ...teGearpumpPCollectionViewTranslatorTest.java |   7 +-
 .../translators/io/GearpumpSourceTest.java      |  18 +-
 .../translators/utils/TranslatorUtilsTest.java  |   5 +-
 12 files changed, 327 insertions(+), 546 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
deleted file mode 100644
index 58b44a3..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump;
-
-import com.google.common.collect.ImmutableList;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
-import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
-import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
-import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
-import org.apache.beam.runners.gearpump.translators.TransformTranslator;
-import org.apache.beam.runners.gearpump.translators.TranslationContext;
-import org.apache.beam.runners.gearpump.translators.WindowAssignTranslator;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
-import org.apache.beam.sdk.values.PValue;
-
-import org.apache.gearpump.util.Graph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
- * into Gearpump {@link Graph}.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      GearpumpPipelineTranslator.class);
-
-  /**
-   * A map from {@link PTransform} subclass to the corresponding
-   * {@link TransformTranslator} to use to translate that transform.
-   */
-  private static final Map<Class<? extends PTransform>, TransformTranslator>
-      transformTranslators = new HashMap<>();
-
-  private final TranslationContext translationContext;
-
-  static {
-    // register TransformTranslators
-    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
-    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
-    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
-    registerTransformTranslator(Flatten.PCollections.class,
-        new FlattenPCollectionsTranslator());
-    registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
-    registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
-    registerTransformTranslator(CreateGearpumpPCollectionView.class,
-        new CreateGearpumpPCollectionViewTranslator<>());
-  }
-
-  public GearpumpPipelineTranslator(TranslationContext translationContext) {
-    this.translationContext = translationContext;
-  }
-
-  public void translate(Pipeline pipeline) {
-    List<PTransformOverride> overrides =
-        ImmutableList.<PTransformOverride>builder()
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsMap.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMap.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsMultimap.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsSingleton.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsList.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsList.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsIterable.class),
-                new ReflectiveOneToOneOverrideFactory(StreamingViewAsIterable.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    StreamingCombineGloballyAsSingletonView.class)))
-            .build();
-
-    pipeline.replaceAll(overrides);
-    pipeline.traverseTopologically(this);
-  }
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.debug("entering composite transform {}", node.getTransform());
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    LOG.debug("leaving composite transform {}", node.getTransform());
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info("visiting transform {}", node.getTransform());
-    PTransform transform = node.getTransform();
-    TransformTranslator translator = getTransformTranslator(transform.getClass());
-    if (null == translator) {
-      throw new IllegalStateException(
-          "no translator registered for " + transform);
-    }
-    translationContext.setCurrentTransform(node, getPipeline());
-    translator.translate(transform, translationContext);
-  }
-
-  @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    LOG.debug("visiting value {}", value);
-  }
-
-  /**
-   * Records that instances of the specified PTransform class
-   * should be translated by default by the corresponding
-   * {@link TransformTranslator}.
-   */
-  private static <TransformT extends PTransform> void registerTransformTranslator(
-      Class<TransformT> transformClass,
-      TransformTranslator<? extends TransformT> transformTranslator) {
-    if (transformTranslators.put(transformClass, transformTranslator) != null) {
-      throw new IllegalArgumentException(
-          "defining multiple translators for " + transformClass);
-    }
-  }
-
-  /**
-   * Returns the {@link TransformTranslator} to use for instances of the
-   * specified PTransform class, or null if none registered.
-   */
-  private <TransformT extends PTransform>
-  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
-    return transformTranslators.get(transformClass);
-  }
-
-  // The following codes are forked from DataflowRunner for View translator
-  private static class ReflectiveOneToOneOverrideFactory<
-          InputT, OutputT, TransformT extends PTransform<PCollection<InputT>, PCollection<OutputT>>>
-      extends SingleInputOutputOverrideFactory<
-          PCollection<InputT>, PCollection<OutputT>, TransformT> {
-    private final Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement;
-
-    private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<PCollection<InputT>, PCollection<OutputT>>> replacement) {
-      this.replacement = replacement;
-    }
-
-    @Override
-    public PTransformReplacement<PCollection<InputT>, PCollection<OutputT>> getReplacementTransform(
-        AppliedPTransform<PCollection<InputT>, PCollection<OutputT>, TransformT> transform) {
-      return PTransformReplacement.of(
-          PTransformReplacements.getSingletonMainInput(transform),
-          InstanceBuilder.ofType(replacement)
-              .withArg(
-                  (Class<PTransform<PCollection<InputT>, PCollection<OutputT>>>)
-                      transform.getTransform().getClass(),
-                  transform.getTransform())
-              .build());
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Gearpump runner.
-   */
-  private static class StreamingViewAsMap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
-    private static final long serialVersionUID = 4791080760092950304L;
-
-    public StreamingViewAsMap(View.AsMap<K, V> transform) {}
-
-    @Override
-    public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // throw new RuntimeException(e);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, V>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMap";
-    }
-  }
-
-  /**
-   * Specialized expansion for {@link
-   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsMultimap<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
-
-    private static final long serialVersionUID = 5854899081751333352L;
-
-    public StreamingViewAsMultimap(View.AsMultimap<K, V> transform) {}
-
-    @Override
-    public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      @SuppressWarnings({"rawtypes", "unchecked"})
-      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-      try {
-        inputCoder.getKeyCoder().verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        // throw new RuntimeException(e);
-      }
-
-      return input
-          .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsMultimap";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsIterable<T>
-      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-
-    private static final long serialVersionUID = -3399860618995613421L;
-
-    public StreamingViewAsIterable(View.AsIterable<T> transform) {}
-
-    @Override
-    public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<T, Iterable<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsIterable";
-    }
-  }
-
-  /**
-   * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
-   * Gearpump runner.
-   */
-  private static class StreamingViewAsList<T>
-      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
-
-    private static final long serialVersionUID = -5018631473886330629L;
-
-    public StreamingViewAsList(View.AsList<T> transform) {}
-
-    @Override
-    public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(
-              input,
-              input.getWindowingStrategy(),
-              input.getCoder());
-
-      return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
-          .apply(CreateGearpumpPCollectionView.<T, List<T>>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsList";
-    }
-  }
-
-
-  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
-
-    private static final long serialVersionUID = 9064900748869035738L;
-    private final Combine.GloballyAsSingletonView<InputT, OutputT> transform;
-
-    public StreamingCombineGloballyAsSingletonView(
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined =
-          input.apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults()
-              .withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(
-          combined,
-          combined.getWindowingStrategy(),
-          transform.getInsertDefault(),
-          transform.getInsertDefault()
-              ? transform.getCombineFn().defaultValue() : null,
-          combined.getCoder());
-      return combined
-          .apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateGearpumpPCollectionView.<OutputT, OutputT>of(view));
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
-    }
-  }
-
-  private static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-
-    private static final long serialVersionUID = 5870455965625071546L;
-    private final View.AsSingleton<T> transform;
-
-    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine.globally(
-          new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException(
-              "Empty PCollection accessed as a singleton view. "
-                  + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-  }
-
-  private static class WrapAsList<T> extends DoFn<T, List<T>> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(Collections.singletonList(c.element()));
-    }
-  }
-
-  /**
-   * Creates a primitive {@link PCollectionView}.
-   *
-   * <p>For internal use only by runner implementors.
-   *
-   * @param <ElemT> The type of the elements of the input PCollection
-   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
-   */
-  public static class CreateGearpumpPCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
-    private static final long serialVersionUID = -2637073020800540542L;
-    private PCollectionView<ViewT> view;
-
-    private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
-      this.view = view;
-    }
-
-    public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
-        PCollectionView<ViewT> view) {
-      return new CreateGearpumpPCollectionView<>(view);
-    }
-
-    public PCollectionView<ViewT> getView() {
-      return view;
-    }
-
-    @Override
-    public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
-      return view;
-    }
-  }
-
-  /**
-   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
-   *
-   * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap},
-   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
-   * They require the input {@link PCollection} fits in memory.
-   * For a large {@link PCollection} this is expected to crash!
-   *
-   * @param <T> the type of elements to concatenate.
-   */
-  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
-    @Override
-    public List<T> createAccumulator() {
-      return new ArrayList<>();
-    }
-
-    @Override
-    public List<T> addInput(List<T> accumulator, T input) {
-      accumulator.add(input);
-      return accumulator;
-    }
-
-    @Override
-    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
-      List<T> result = createAccumulator();
-      for (List<T> accumulator : accumulators) {
-        result.addAll(accumulator);
-      }
-      return result;
-    }
-
-    @Override
-    public List<T> extractOutput(List<T> accumulator) {
-      return accumulator;
-    }
-
-    @Override
-    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-
-    @Override
-    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
-      return ListCoder.of(inputCoder);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 30b1935..ae59121 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -23,6 +23,7 @@ import com.typesafe.config.ConfigValueFactory;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.beam.runners.gearpump.translators.GearpumpPipelineTranslator;
 import org.apache.beam.runners.gearpump.translators.TranslationContext;
 
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
index d7588c2..559cb28 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.gearpump.translators;
 
 import java.util.List;
 
-import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -30,17 +29,17 @@ import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
  * transforms.
  */
 public class CreateGearpumpPCollectionViewTranslator<ElemT, ViewT> implements
-    TransformTranslator<GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT>> {
+    TransformTranslator<CreateStreamingGearpumpView.CreateGearpumpPCollectionView<ElemT, ViewT>> {
 
   private static final long serialVersionUID = -3955521308055056034L;
 
   @Override
   public void translate(
-      GearpumpPipelineTranslator.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
+      CreateStreamingGearpumpView.CreateGearpumpPCollectionView<ElemT, ViewT> transform,
       TranslationContext context) {
     JavaStream<WindowedValue<List<ElemT>>> inputStream =
         context.getInputStream(context.getInput());
-    PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
-    context.setOutputStream(view.getPCollection(), inputStream);
+    PCollectionView<ViewT> view = transform.getView();
+    context.setOutputStream(view, inputStream);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
new file mode 100644
index 0000000..60577b3
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreateStreamingGearpumpView.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.Iterables;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Gearpump streaming overrides for various view (side input) transforms. */
+class CreateStreamingGearpumpView<ElemT, ViewT>
+    extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+  private final PCollectionView<ViewT> view;
+
+  public CreateStreamingGearpumpView(PCollectionView<ViewT> view) {
+    this.view = view;
+  }
+
+  @Override
+  public PCollection<ElemT> expand(PCollection<ElemT> input) {
+    input
+        .apply(Combine.globally(new Concatenate<ElemT>()).withoutDefaults())
+        .apply(CreateGearpumpPCollectionView.<ElemT, ViewT>of(view));
+    return input;
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+   *
+   * <p>For internal use by {@link CreateStreamingGearpumpView}. This combiner requires that
+   * the input {@link PCollection} fits in memory. For a large {@link PCollection} this is
+   * expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+   */
+  public static class CreateGearpumpPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollection<List<ElemT>>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateGearpumpPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateGearpumpPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateGearpumpPCollectionView<>(view);
+    }
+
+    @Override
+    public PCollection<List<ElemT>> expand(PCollection<List<ElemT>> input) {
+      return PCollection.<List<ElemT>>createPrimitiveOutputInternal(
+              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+          .setCoder(input.getCoder());
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+  }
+
+  public static class Factory<ElemT, ViewT>
+      implements PTransformOverrideFactory<
+          PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>> {
+    public Factory() {}
+
+    @Override
+    public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
+        AppliedPTransform<
+                PCollection<ElemT>, PCollection<ElemT>, CreatePCollectionView<ElemT, ViewT>>
+            transform) {
+      return PTransformReplacement.of(
+          (PCollection<ElemT>) Iterables.getOnlyElement(transform.getInputs().values()),
+          new CreateStreamingGearpumpView<ElemT, ViewT>(transform.getTransform().getView()));
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PValue> outputs, PCollection<ElemT> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
new file mode 100644
index 0000000..ca98aac
--- /dev/null
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GearpumpPipelineTranslator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PValue;
+
+import org.apache.gearpump.util.Graph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link GearpumpPipelineTranslator} knows how to translate {@link Pipeline} objects
+ * into Gearpump {@link Graph}.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      GearpumpPipelineTranslator.class);
+
+  /**
+   * A map from {@link PTransform} subclass to the corresponding
+   * {@link TransformTranslator} to use to translate that transform.
+   */
+  private static final Map<Class<? extends PTransform>, TransformTranslator>
+      transformTranslators = new HashMap<>();
+
+  private final TranslationContext translationContext;
+
+  static {
+    // register TransformTranslators
+    registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+    registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
+    registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
+    registerTransformTranslator(Flatten.PCollections.class,
+        new FlattenPCollectionsTranslator());
+    registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
+    registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
+    registerTransformTranslator(CreateStreamingGearpumpView.CreateGearpumpPCollectionView.class,
+        new CreateGearpumpPCollectionViewTranslator());
+  }
+
+  public GearpumpPipelineTranslator(TranslationContext translationContext) {
+    this.translationContext = translationContext;
+  }
+
+  public void translate(Pipeline pipeline) {
+    List<PTransformOverride> overrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
+                new CreateStreamingGearpumpView.Factory()))
+            .build();
+
+    pipeline.replaceAll(overrides);
+    pipeline.traverseTopologically(this);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.debug("entering composite transform {}", node.getTransform());
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    LOG.debug("leaving composite transform {}", node.getTransform());
+  }
+
+  @Override
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.debug("visiting transform {}", node.getTransform());
+    PTransform transform = node.getTransform();
+    TransformTranslator translator = getTransformTranslator(transform.getClass());
+    if (null == translator) {
+      throw new IllegalStateException(
+          "no translator registered for " + transform);
+    }
+    translationContext.setCurrentTransform(node, getPipeline());
+    translator.translate(transform, translationContext);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
+    LOG.debug("visiting value {}", value);
+  }
+
+  /**
+   * Records that instances of the specified PTransform class
+   * should be translated by default by the corresponding
+   * {@link TransformTranslator}.
+   */
+  private static <TransformT extends PTransform> void registerTransformTranslator(
+      Class<TransformT> transformClass,
+      TransformTranslator<? extends TransformT> transformTranslator) {
+    if (transformTranslators.put(transformClass, transformTranslator) != null) {
+      throw new IllegalArgumentException(
+          "defining multiple translators for " + transformClass);
+    }
+  }
+
+  /**
+   * Returns the {@link TransformTranslator} to use for instances of the
+   * specified PTransform class, or null if none registered.
+   */
+  private <TransformT extends PTransform>
+  TransformTranslator<TransformT> getTransformTranslator(Class<TransformT> transformClass) {
+    return transformTranslators.get(transformClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index 64a1e0d..42b7a53 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -78,7 +79,7 @@ public class TranslationContext {
   }
 
   public PValue getInput() {
-    return Iterables.getOnlyElement(getInputs().values());
+    return Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
   }
 
   public Map<TupleTag<?>, PValue> getOutputs() {

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
index e2777df..b20896a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/functions/DoFnFunction.java
@@ -135,7 +135,6 @@ public class DoFnFunction<InputT, OutputT> extends
       }
     }
 
-
     for (PCollectionView<?> sideInput: sideInputs) {
       for (WindowedValue<InputT> value : pushedBackValues) {
         for (BoundedWindow win: value.getWindows()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
index 6637a9b..558eb0d 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSource.java
@@ -80,7 +80,7 @@ public abstract class GearpumpSource<T> implements DataSource {
         org.joda.time.Instant timestamp = reader.getCurrentTimestamp();
         message = new DefaultMessage(
             WindowedValue.timestampedValueInGlobalWindow(data, timestamp),
-            TranslatorUtils.jodaTimeToJava8Time(timestamp));
+            timestamp.getMillis());
       }
       available = reader.advance();
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index 83fc6e6..b1cd61c 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -52,6 +52,7 @@ public class TranslatorUtils {
   }
 
   public static Window boundedWindowToGearpumpWindow(BoundedWindow window) {
+    // Gearpump window upper bound is exclusive
     Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp().plus(1L));
     if (window instanceof IntervalWindow) {
       IntervalWindow intervalWindow = (IntervalWindow) window;

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
index b23b0c6..511eed1 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
@@ -38,8 +37,8 @@ public class CreateGearpumpPCollectionViewTranslatorTest {
     CreateGearpumpPCollectionViewTranslator translator =
         new CreateGearpumpPCollectionViewTranslator();
 
-    GearpumpPipelineTranslator.CreateGearpumpPCollectionView pCollectionView =
-        mock(GearpumpPipelineTranslator.CreateGearpumpPCollectionView.class);
+    CreateStreamingGearpumpView.CreateGearpumpPCollectionView pCollectionView =
+        mock(CreateStreamingGearpumpView.CreateGearpumpPCollectionView.class);
 
     JavaStream javaStream = mock(JavaStream.class);
     TranslationContext translationContext = mock(TranslationContext.class);
@@ -49,7 +48,7 @@ public class CreateGearpumpPCollectionViewTranslatorTest {
     when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
 
     PCollectionView view = mock(PCollectionView.class);
-    when(translationContext.getOutput()).thenReturn(view);
+    when(pCollectionView.getView()).thenReturn(view);
 
     translator.translate(pCollectionView, translationContext);
     verify(translationContext, times(1)).setOutputStream(view, javaStream);

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index 4490737..cc4284f 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.gearpump.DefaultMessage;
@@ -42,10 +43,11 @@ import org.junit.Test;
 public class GearpumpSourceTest {
   private static final List<TimestampedValue<String>> TEST_VALUES =
       Lists.newArrayList(
-          TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+          TimestampedValue.of("a", BoundedWindow.TIMESTAMP_MIN_VALUE),
           TimestampedValue.of("b", new org.joda.time.Instant(0)),
           TimestampedValue.of("c", new org.joda.time.Instant(53)),
-          TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)));
+          TimestampedValue.of("d", BoundedWindow.TIMESTAMP_MAX_VALUE)
+      );
 
   private static class SourceForTest<T> extends GearpumpSource<T> {
     private ValuesSource<T> valuesSource;
@@ -72,10 +74,16 @@ public class GearpumpSourceTest {
         new SourceForTest<>(options, valuesSource);
     sourceForTest.open(null, Instant.EPOCH);
 
-    for (TimestampedValue<String> value : TEST_VALUES) {
+    for (int i = 0; i < TEST_VALUES.size(); i++) {
+      TimestampedValue<String> value = TEST_VALUES.get(i);
+
       // Check the watermark first since the Source will advance when it's opened
-      Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
-      Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+      if (i < TEST_VALUES.size() - 1) {
+        Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
+        Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
+      } else {
+        Assert.assertEquals(Watermark.MAX(), sourceForTest.getWatermark());
+      }
 
       Message expectedMsg =
           new DefaultMessage(

http://git-wip-us.apache.org/repos/asf/beam/blob/a7b5d981/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
index 524887d..6ebe59b 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtilsTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.Lists;
 
-import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
 
@@ -68,7 +67,7 @@ public class TranslatorUtilsTest {
             Instant.ofEpochMilli(Long.MAX_VALUE))));
     BoundedWindow globalWindow = GlobalWindow.INSTANCE;
     assertThat(TranslatorUtils.boundedWindowToGearpumpWindow(globalWindow),
-        equalTo(Window.apply(Instant.ofEpochMilli(Long.MIN_VALUE / 1000),
-            Instant.ofEpochMilli(Long.MAX_VALUE / 1000).minus(Duration.ofDays(1)).plusMillis(1))));
+        equalTo(Window.apply(Instant.ofEpochMilli(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()),
+            Instant.ofEpochMilli(globalWindow.maxTimestamp().getMillis() + 1))));
   }
 }