You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:38 UTC

[13/50] [abbrv] beam git commit: [BEAM-972] Add more unit test to Gearpump runner

[BEAM-972] Add more unit test to Gearpump runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3138dde
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3138dde
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3138dde

Branch: refs/heads/master
Commit: f3138dde12b0c6a5cfdb8fefab916a6060b7f5ea
Parents: f4f2333
Author: huafengw <fv...@gmail.com>
Authored: Wed Apr 12 19:11:09 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Apr 13 10:06:55 2017 +0800

----------------------------------------------------------------------
 .../FlattenPCollectionsTranslator.java          |   2 +-
 .../translators/GroupByKeyTranslator.java       |  20 ++-
 .../translators/WindowAssignTranslator.java     |   5 +-
 ...teGearpumpPCollectionViewTranslatorTest.java |  57 +++++++
 .../CreatePCollectionViewTranslatorTest.java    |  55 +++++++
 .../FlattenPCollectionsTranslatorTest.java      | 137 +++++++++++++++++
 .../translators/GroupByKeyTranslatorTest.java   | 151 +++++++++++++++++++
 .../translators/ReadBoundedTranslatorTest.java  |  70 +++++++++
 .../ReadUnboundedTranslatorTest.java            |  70 +++++++++
 .../translators/WindowAssignTranslatorTest.java | 110 ++++++++++++++
 .../translators/io/GearpumpSourceTest.java      |  36 ++---
 .../gearpump/translators/io/ValueSoureTest.java |  15 +-
 12 files changed, 695 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 3a465cb..56f7d1a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -47,7 +47,6 @@ public class FlattenPCollectionsTranslator<T> implements
     Set<PCollection<T>> unique = new HashSet<>();
     for (TaggedPValue input: context.getInputs()) {
       PCollection<T> collection = (PCollection<T>) input.getValue();
-      unique.add(collection);
       JavaStream<T> inputStream = context.getInputStream(collection);
       if (null == merged) {
         merged = inputStream;
@@ -60,6 +59,7 @@ public class FlattenPCollectionsTranslator<T> implements
 
         merged = merged.merge(inputStream, transform.getName());
       }
+      unique.add(collection);
     }
 
     if (null == merged) {

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 5dfd3e9..54c8737 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -83,7 +83,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     context.setOutputStream(context.getOutput(), outputStream);
   }
 
-  private static class GearpumpWindowFn<T, W extends BoundedWindow>
+  /**
+   * A transform used internally to translate Beam's Window to Gearpump's Window.
+   */
+  protected static class GearpumpWindowFn<T, W extends BoundedWindow>
       implements WindowFunction<WindowedValue<T>>, Serializable {
 
     private final boolean isNonMerging;
@@ -115,7 +118,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class GroupByFn<K, V> extends
+  /**
+   * A transform used internally to group KV message by its key.
+   */
+  protected static class GroupByFn<K, V> extends
       GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
 
     private static final long serialVersionUID = -807905402490735530L;
@@ -135,7 +141,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class KeyedByTimestamp<K, V>
+  /**
+   * A transform used internally to transform WindowedValue to KV.
+   */
+  protected static class KeyedByTimestamp<K, V>
       extends MapFunction<WindowedValue<KV<K, V>>,
       KV<Instant, WindowedValue<KV<K, V>>>> {
 
@@ -154,7 +163,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
     }
   }
 
-  private static class Merge<K, V> extends
+  /**
+   * A transform used internally by Gearpump which encapsulates the merge logic.
+   */
+  protected static class Merge<K, V> extends
       FoldFunction<KV<Instant, WindowedValue<KV<K, V>>>,
       KV<Instant, WindowedValue<KV<K, List<V>>>>> {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 29d8f02..2d70b63 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -57,7 +57,10 @@ public class WindowAssignTranslator<T> implements TransformTranslator<Window.Ass
     context.setOutputStream(output, outputStream);
   }
 
-  private static class AssignWindows<T> extends
+  /**
+   * A Function used internally by Gearpump to wrap the actual Beam's WindowFn.
+   */
+  protected static class AssignWindows<T> extends
       FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
 
     private static final long serialVersionUID = 7284565861938681360L;

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
new file mode 100644
index 0000000..b23b0c6
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.junit.Test;
+
+/** Tests for {@link CreateGearpumpPCollectionViewTranslator}. */
+public class CreateGearpumpPCollectionViewTranslatorTest {
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslate() {
+    CreateGearpumpPCollectionViewTranslator translator =
+        new CreateGearpumpPCollectionViewTranslator();
+
+    GearpumpPipelineTranslator.CreateGearpumpPCollectionView pCollectionView =
+        mock(GearpumpPipelineTranslator.CreateGearpumpPCollectionView.class);
+
+    JavaStream javaStream = mock(JavaStream.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    PValue mockInput = mock(PValue.class);
+    when(translationContext.getInput()).thenReturn(mockInput);
+    when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
+
+    PCollectionView view = mock(PCollectionView.class);
+    when(translationContext.getOutput()).thenReturn(view);
+
+    translator.translate(pCollectionView, translationContext);
+    verify(translationContext, times(1)).setOutputStream(view, javaStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
new file mode 100644
index 0000000..42ff14e
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.junit.Test;
+
+/** Tests for {@link CreatePCollectionViewTranslator}. */
+public class CreatePCollectionViewTranslatorTest {
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslate() {
+    CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator();
+    View.CreatePCollectionView<String, Iterable<String>> createView =
+        mock(View.CreatePCollectionView.class);
+
+    JavaStream javaStream = mock(JavaStream.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    PValue mockInput = mock(PValue.class);
+    when(translationContext.getInput()).thenReturn(mockInput);
+    when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
+
+    PCollectionView view = mock(PCollectionView.class);
+    when(translationContext.getOutput()).thenReturn(view);
+
+    translator.translate(createView, translationContext);
+    verify(translationContext, times(1)).setOutputStream(view, javaStream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
new file mode 100644
index 0000000..fa89d4a
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link FlattenPCollectionsTranslator}. */
+public class FlattenPCollectionsTranslatorTest {
+
+  private FlattenPCollectionsTranslator translator = new FlattenPCollectionsTranslator();
+  private Flatten.PCollections transform = mock(Flatten.PCollections.class);
+
+  class UnboundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+    @Override
+    public boolean matches(Object o) {
+      return o instanceof UnboundedSourceWrapper;
+    }
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslateWithEmptyCollection() {
+    PValue mockOutput = mock(PValue.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST);
+    when(translationContext.getOutput()).thenReturn(mockOutput);
+
+    translator.translate(transform, translationContext);
+    verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslateWithOneCollection() {
+    JavaStream javaStream = mock(JavaStream.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    TaggedPValue mockInput = mock(TaggedPValue.class);
+    PCollection mockCollection = mock(PCollection.class);
+    when(mockInput.getValue()).thenReturn(mockCollection);
+
+    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput));
+    when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream);
+
+    PValue mockOutput = mock(PValue.class);
+    when(translationContext.getOutput()).thenReturn(mockOutput);
+
+    translator.translate(transform, translationContext);
+    verify(translationContext, times(1)).setOutputStream(mockOutput, javaStream);
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testWithMoreThanOneCollections() {
+    String transformName = "transform";
+    when(transform.getName()).thenReturn(transformName);
+
+    JavaStream javaStream1 = mock(JavaStream.class);
+    JavaStream javaStream2 = mock(JavaStream.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    TaggedPValue mockInput1 = mock(TaggedPValue.class);
+    PCollection mockCollection1 = mock(PCollection.class);
+    when(mockInput1.getValue()).thenReturn(mockCollection1);
+
+    TaggedPValue mockInput2 = mock(TaggedPValue.class);
+    PCollection mockCollection2 = mock(PCollection.class);
+    when(mockInput2.getValue()).thenReturn(mockCollection2);
+
+    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+    when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+    when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
+
+    translator.translate(transform, translationContext);
+    verify(javaStream1).merge(javaStream2, transformName);
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testWithDuplicatedCollections() {
+    String transformName = "transform";
+    when(transform.getName()).thenReturn(transformName);
+
+    JavaStream javaStream1 = mock(JavaStream.class);
+    TranslationContext translationContext = mock(TranslationContext.class);
+
+    PCollection mockCollection1 = mock(PCollection.class);
+    TaggedPValue mockInput1 = mock(TaggedPValue.class);
+    when(mockInput1.getValue()).thenReturn(mockCollection1);
+
+    TaggedPValue mockInput2 = mock(TaggedPValue.class);
+    when(mockInput2.getValue()).thenReturn(mockCollection1);
+
+    when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+    when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+
+    translator.translate(transform, translationContext);
+    verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
+    verify(javaStream1).merge(any(JavaStream.class), eq(transformName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
new file mode 100644
index 0000000..9135022
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.GearpumpWindowFn;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.apache.gearpump.streaming.dsl.window.impl.Window;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests for {@link GroupByKeyTranslator}. */
+@RunWith(Parameterized.class)
+public class GroupByKeyTranslatorTest {
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testGearpumpWindowFn() {
+    GearpumpWindowFn windowFn = new GearpumpWindowFn(true);
+    List<BoundedWindow> windows =
+        Lists.newArrayList(
+            new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)),
+            new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(15)));
+
+    WindowFunction.Context<WindowedValue<String>> context =
+        new WindowFunction.Context<WindowedValue<String>>() {
+          @Override
+          public Instant timestamp() {
+            return Instant.EPOCH;
+          }
+
+          @Override
+          public WindowedValue<String> element() {
+            return WindowedValue.of(
+                "v1", new org.joda.time.Instant(6), windows, PaneInfo.NO_FIRING);
+          }
+        };
+
+    Window[] result = windowFn.apply(context);
+    List<Window> expected = Lists.newArrayList();
+    for (BoundedWindow w : windows) {
+      expected.add(TranslatorUtils.boundedWindowToGearpumpWindow(w));
+    }
+    assertThat(result, equalTo(expected.toArray()));
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+    return ImmutableList.of(
+        OutputTimeFns.outputAtEarliestInputTimestamp(),
+        OutputTimeFns.outputAtLatestInputTimestamp(),
+        OutputTimeFns.outputAtEndOfWindow());
+  }
+
+  @Parameterized.Parameter(0)
+  public OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testKeyedByTimestamp() {
+    BoundedWindow window =
+        new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
+    GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
+        new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn);
+    WindowedValue<KV<String, String>> value =
+        WindowedValue.of(
+            KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
+    KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
+        keyedByTimestamp.map(value);
+    org.joda.time.Instant time =
+        outputTimeFn.assignOutputTime(
+            value.getTimestamp(), Iterables.getOnlyElement(value.getWindows()));
+    assertThat(result, equalTo(KV.of(time, value)));
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testMerge() {
+    WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
+    GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn);
+    org.joda.time.Instant key1 = new org.joda.time.Instant(5);
+    WindowedValue<KV<String, String>> value1 =
+        WindowedValue.of(
+            KV.of("key1", "value1"),
+            key1,
+            new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(10)),
+            PaneInfo.NO_FIRING);
+
+    org.joda.time.Instant key2 = new org.joda.time.Instant(10);
+    WindowedValue<KV<String, String>> value2 =
+        WindowedValue.of(
+            KV.of("key2", "value2"),
+            key2,
+            new IntervalWindow(new org.joda.time.Instant(9), new org.joda.time.Instant(14)),
+            PaneInfo.NO_FIRING);
+
+    KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 =
+        merge.fold(KV.of(null, null), KV.of(key1, value1));
+    assertThat(result1.getKey(), equalTo(key1));
+    assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1")));
+
+    KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 =
+        merge.fold(result1, KV.of(key2, value2));
+    assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2)));
+    Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows();
+    assertThat(resultWindows.size(), equalTo(1));
+    IntervalWindow expectedWindow =
+        new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(14));
+    assertThat(resultWindows.toArray()[0], equalTo(expectedWindow));
+    assertThat(
+        result2.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1", "value2")));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
new file mode 100644
index 0000000..20ee1a2
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link ReadBoundedTranslator}. */
+public class ReadBoundedTranslatorTest {
+
+  class BoundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+    @Override
+    public boolean matches(Object o) {
+      return o instanceof BoundedSourceWrapper;
+    }
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslate() {
+    ReadBoundedTranslator translator = new ReadBoundedTranslator();
+    GearpumpPipelineOptions options =
+        PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+    Read.Bounded transform = mock(Read.Bounded.class);
+    BoundedSource source = mock(BoundedSource.class);
+    when(transform.getSource()).thenReturn(source);
+
+    TranslationContext translationContext = mock(TranslationContext.class);
+    when(translationContext.getPipelineOptions()).thenReturn(options);
+
+    JavaStream stream = mock(JavaStream.class);
+    PValue mockOutput = mock(PValue.class);
+    when(translationContext.getOutput()).thenReturn(mockOutput);
+    when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
+
+    translator.translate(transform, translationContext);
+    verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher()));
+    verify(translationContext).setOutputStream(mockOutput, stream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
new file mode 100644
index 0000000..f27b568
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link ReadUnboundedTranslator}. */
+public class ReadUnboundedTranslatorTest {
+
+  class UnboundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+    @Override
+    public boolean matches(Object o) {
+      return o instanceof UnboundedSourceWrapper;
+    }
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testTranslate() {
+    ReadUnboundedTranslator translator = new ReadUnboundedTranslator();
+    GearpumpPipelineOptions options =
+        PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+    Read.Unbounded transform = mock(Read.Unbounded.class);
+    UnboundedSource source = mock(UnboundedSource.class);
+    when(transform.getSource()).thenReturn(source);
+
+    TranslationContext translationContext = mock(TranslationContext.class);
+    when(translationContext.getPipelineOptions()).thenReturn(options);
+
+    JavaStream stream = mock(JavaStream.class);
+    PValue mockOutput = mock(PValue.class);
+    when(translationContext.getOutput()).thenReturn(mockOutput);
+    when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
+
+    translator.translate(transform, translationContext);
+    verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
+    verify(translationContext).setOutputStream(mockOutput, stream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
new file mode 100644
index 0000000..06ccaaf
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link WindowAssignTranslator}. */
+public class WindowAssignTranslatorTest {
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testAssignWindowsWithSlidingWindow() {
+    WindowFn slidingWindows = SlidingWindows.of(Duration.millis(10)).every(Duration.millis(5));
+    WindowAssignTranslator.AssignWindows<String> assignWindows =
+        new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+    String value = "v1";
+    Instant timestamp = new Instant(1);
+    WindowedValue<String> windowedValue =
+        WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+    ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+    expected.add(
+        WindowedValue.of(
+            value,
+            timestamp,
+            new IntervalWindow(new Instant(0), new Instant(10)),
+            PaneInfo.NO_FIRING));
+    expected.add(
+        WindowedValue.of(
+            value,
+            timestamp,
+            new IntervalWindow(new Instant(-5), new Instant(5)),
+            PaneInfo.NO_FIRING));
+
+    Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+    assertThat(expected, equalTo(Lists.newArrayList(result)));
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testAssignWindowsWithSessions() {
+    WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
+    WindowAssignTranslator.AssignWindows<String> assignWindows =
+        new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+    String value = "v1";
+    Instant timestamp = new Instant(1);
+    WindowedValue<String> windowedValue =
+        WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+    ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+    expected.add(
+        WindowedValue.of(
+            value,
+            timestamp,
+            new IntervalWindow(new Instant(1), new Instant(11)),
+            PaneInfo.NO_FIRING));
+
+    Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+    assertThat(expected, equalTo(Lists.newArrayList(result)));
+  }
+
+  @Test
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testAssignWindowsGlobal() {
+    WindowFn slidingWindows = new GlobalWindows();
+    WindowAssignTranslator.AssignWindows<String> assignWindows =
+        new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+    String value = "v1";
+    Instant timestamp = new Instant(1);
+    WindowedValue<String> windowedValue =
+        WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+    ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+    expected.add(WindowedValue.timestampedValueInGlobalWindow(value, timestamp));
+
+    Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+    assertThat(expected, equalTo(Lists.newArrayList(result)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index af5a1d2..b244484 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -37,16 +37,14 @@ import org.apache.gearpump.streaming.source.Watermark;
 import org.junit.Assert;
 import org.junit.Test;
 
-/**
- * Tests for {@link GearpumpSource}.
- */
+/** Tests for {@link GearpumpSource}. */
 public class GearpumpSourceTest {
-  private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList(
-    TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
-    TimestampedValue.of("b", new org.joda.time.Instant(0)),
-    TimestampedValue.of("c", new org.joda.time.Instant(53)),
-    TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1))
-  );
+  private static final List<TimestampedValue<String>> TEST_VALUES =
+      Lists.newArrayList(
+          TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+          TimestampedValue.of("b", new org.joda.time.Instant(0)),
+          TimestampedValue.of("c", new org.joda.time.Instant(53)),
+          TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)));
 
   private static class SourceForTest<T> extends GearpumpSource<T> {
     private ValuesSource<T> valuesSource;
@@ -64,22 +62,24 @@ public class GearpumpSourceTest {
 
   @Test
   public void testGearpumpSource() {
-    GearpumpPipelineOptions options = PipelineOptionsFactory.create()
-      .as(GearpumpPipelineOptions.class);
-    ValuesSource<TimestampedValue<String>> valuesSource = new ValuesSource<>(TEST_VALUES,
-      TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
+    GearpumpPipelineOptions options =
+        PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+    ValuesSource<TimestampedValue<String>> valuesSource =
+        new ValuesSource<>(
+            TEST_VALUES, TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
     SourceForTest<TimestampedValue<String>> sourceForTest =
-      new SourceForTest<>(options, valuesSource);
+        new SourceForTest<>(options, valuesSource);
     sourceForTest.open(null, Instant.EPOCH);
 
-    for (TimestampedValue<String> value: TEST_VALUES) {
+    for (TimestampedValue<String> value : TEST_VALUES) {
       // Check the watermark first since the Source will advance when it's opened
       Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
       Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
 
-      Message expectedMsg = Message.apply(
-        WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
-        value.getTimestamp().getMillis());
+      Message expectedMsg =
+          Message.apply(
+              WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
+              value.getTimestamp().getMillis());
       Message message = sourceForTest.read();
       Assert.assertEquals(expectedMsg, message);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
index 8c50703..439e1b1 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -41,18 +41,16 @@ import org.apache.gearpump.util.Constants;
 import org.junit.Assert;
 import org.junit.Test;
 
-/**
- * Tests for {@link ValuesSource}.
- */
+/** Tests for {@link ValuesSource}. */
 public class ValueSoureTest {
 
   @Test
   public void testValueSource() {
-    GearpumpPipelineOptions options = PipelineOptionsFactory.create()
-      .as(GearpumpPipelineOptions.class);
+    GearpumpPipelineOptions options =
+        PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
     Config config = ClusterConfig.master(null);
-    config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
-      ConfigValueFactory.fromAnyRef(0));
+    config =
+        config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
     EmbeddedCluster cluster = new EmbeddedCluster(config);
     cluster.start();
 
@@ -62,8 +60,7 @@ public class ValueSoureTest {
     Pipeline p = Pipeline.create(options);
     List<String> values = Lists.newArrayList("1", "2", "3", "4", "5");
     ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of());
-    p.apply(Read.from(source))
-      .apply(ParDo.of(new ResultCollector()));
+    p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector()));
 
     p.run().waitUntilFinish();
     cluster.stop();