You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:38 UTC
[13/50] [abbrv] beam git commit: [BEAM-972] Add more unit test to
Gearpump runner
[BEAM-972] Add more unit test to Gearpump runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f3138dde
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f3138dde
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f3138dde
Branch: refs/heads/master
Commit: f3138dde12b0c6a5cfdb8fefab916a6060b7f5ea
Parents: f4f2333
Author: huafengw <fv...@gmail.com>
Authored: Wed Apr 12 19:11:09 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Thu Apr 13 10:06:55 2017 +0800
----------------------------------------------------------------------
.../FlattenPCollectionsTranslator.java | 2 +-
.../translators/GroupByKeyTranslator.java | 20 ++-
.../translators/WindowAssignTranslator.java | 5 +-
...teGearpumpPCollectionViewTranslatorTest.java | 57 +++++++
.../CreatePCollectionViewTranslatorTest.java | 55 +++++++
.../FlattenPCollectionsTranslatorTest.java | 137 +++++++++++++++++
.../translators/GroupByKeyTranslatorTest.java | 151 +++++++++++++++++++
.../translators/ReadBoundedTranslatorTest.java | 70 +++++++++
.../ReadUnboundedTranslatorTest.java | 70 +++++++++
.../translators/WindowAssignTranslatorTest.java | 110 ++++++++++++++
.../translators/io/GearpumpSourceTest.java | 36 ++---
.../gearpump/translators/io/ValueSoureTest.java | 15 +-
12 files changed, 695 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
index 3a465cb..56f7d1a 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslator.java
@@ -47,7 +47,6 @@ public class FlattenPCollectionsTranslator<T> implements
Set<PCollection<T>> unique = new HashSet<>();
for (TaggedPValue input: context.getInputs()) {
PCollection<T> collection = (PCollection<T>) input.getValue();
- unique.add(collection);
JavaStream<T> inputStream = context.getInputStream(collection);
if (null == merged) {
merged = inputStream;
@@ -60,6 +59,7 @@ public class FlattenPCollectionsTranslator<T> implements
merged = merged.merge(inputStream, transform.getName());
}
+ unique.add(collection);
}
if (null == merged) {
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 5dfd3e9..54c8737 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -83,7 +83,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
context.setOutputStream(context.getOutput(), outputStream);
}
- private static class GearpumpWindowFn<T, W extends BoundedWindow>
+ /**
+ * A transform used internally to translate Beam's Window to Gearpump's Window.
+ */
+ protected static class GearpumpWindowFn<T, W extends BoundedWindow>
implements WindowFunction<WindowedValue<T>>, Serializable {
private final boolean isNonMerging;
@@ -115,7 +118,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class GroupByFn<K, V> extends
+ /**
+ * A transform used internally to group KV message by its key.
+ */
+ protected static class GroupByFn<K, V> extends
GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {
private static final long serialVersionUID = -807905402490735530L;
@@ -135,7 +141,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class KeyedByTimestamp<K, V>
+ /**
+ * A transform used internally to transform WindowedValue to KV.
+ */
+ protected static class KeyedByTimestamp<K, V>
extends MapFunction<WindowedValue<KV<K, V>>,
KV<Instant, WindowedValue<KV<K, V>>>> {
@@ -154,7 +163,10 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
}
}
- private static class Merge<K, V> extends
+ /**
+ * A transform used internally by Gearpump which encapsulates the merge logic.
+ */
+ protected static class Merge<K, V> extends
FoldFunction<KV<Instant, WindowedValue<KV<K, V>>>,
KV<Instant, WindowedValue<KV<K, List<V>>>>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
index 29d8f02..2d70b63 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslator.java
@@ -57,7 +57,10 @@ public class WindowAssignTranslator<T> implements TransformTranslator<Window.Ass
context.setOutputStream(output, outputStream);
}
- private static class AssignWindows<T> extends
+ /**
+ * A Function used internally by Gearpump to wrap the actual Beam's WindowFn.
+ */
+ protected static class AssignWindows<T> extends
FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
private static final long serialVersionUID = 7284565861938681360L;
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
new file mode 100644
index 0000000..b23b0c6
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreateGearpumpPCollectionViewTranslatorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineTranslator;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.junit.Test;
+
+/** Tests for {@link CreateGearpumpPCollectionViewTranslator}. */
+public class CreateGearpumpPCollectionViewTranslatorTest {
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslate() {
+ CreateGearpumpPCollectionViewTranslator translator =
+ new CreateGearpumpPCollectionViewTranslator();
+
+ GearpumpPipelineTranslator.CreateGearpumpPCollectionView pCollectionView =
+ mock(GearpumpPipelineTranslator.CreateGearpumpPCollectionView.class);
+
+ JavaStream javaStream = mock(JavaStream.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ PValue mockInput = mock(PValue.class);
+ when(translationContext.getInput()).thenReturn(mockInput);
+ when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
+
+ PCollectionView view = mock(PCollectionView.class);
+ when(translationContext.getOutput()).thenReturn(view);
+
+ translator.translate(pCollectionView, translationContext);
+ verify(translationContext, times(1)).setOutputStream(view, javaStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
new file mode 100644
index 0000000..42ff14e
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.junit.Test;
+
+/** Tests for {@link CreatePCollectionViewTranslator}. */
+public class CreatePCollectionViewTranslatorTest {
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslate() {
+ CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator();
+ View.CreatePCollectionView<String, Iterable<String>> createView =
+ mock(View.CreatePCollectionView.class);
+
+ JavaStream javaStream = mock(JavaStream.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ PValue mockInput = mock(PValue.class);
+ when(translationContext.getInput()).thenReturn(mockInput);
+ when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
+
+ PCollectionView view = mock(PCollectionView.class);
+ when(translationContext.getOutput()).thenReturn(view);
+
+ translator.translate(createView, translationContext);
+ verify(translationContext, times(1)).setOutputStream(view, javaStream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
new file mode 100644
index 0000000..fa89d4a
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/FlattenPCollectionsTranslatorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link FlattenPCollectionsTranslator}. */
+public class FlattenPCollectionsTranslatorTest {
+
+ private FlattenPCollectionsTranslator translator = new FlattenPCollectionsTranslator();
+ private Flatten.PCollections transform = mock(Flatten.PCollections.class);
+
+ class UnboundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+ @Override
+ public boolean matches(Object o) {
+ return o instanceof UnboundedSourceWrapper;
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslateWithEmptyCollection() {
+ PValue mockOutput = mock(PValue.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ when(translationContext.getInputs()).thenReturn(Collections.EMPTY_LIST);
+ when(translationContext.getOutput()).thenReturn(mockOutput);
+
+ translator.translate(transform, translationContext);
+ verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslateWithOneCollection() {
+ JavaStream javaStream = mock(JavaStream.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ TaggedPValue mockInput = mock(TaggedPValue.class);
+ PCollection mockCollection = mock(PCollection.class);
+ when(mockInput.getValue()).thenReturn(mockCollection);
+
+ when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput));
+ when(translationContext.getInputStream(mockCollection)).thenReturn(javaStream);
+
+ PValue mockOutput = mock(PValue.class);
+ when(translationContext.getOutput()).thenReturn(mockOutput);
+
+ translator.translate(transform, translationContext);
+ verify(translationContext, times(1)).setOutputStream(mockOutput, javaStream);
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testWithMoreThanOneCollections() {
+ String transformName = "transform";
+ when(transform.getName()).thenReturn(transformName);
+
+ JavaStream javaStream1 = mock(JavaStream.class);
+ JavaStream javaStream2 = mock(JavaStream.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ TaggedPValue mockInput1 = mock(TaggedPValue.class);
+ PCollection mockCollection1 = mock(PCollection.class);
+ when(mockInput1.getValue()).thenReturn(mockCollection1);
+
+ TaggedPValue mockInput2 = mock(TaggedPValue.class);
+ PCollection mockCollection2 = mock(PCollection.class);
+ when(mockInput2.getValue()).thenReturn(mockCollection2);
+
+ when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+ when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+ when(translationContext.getInputStream(mockCollection2)).thenReturn(javaStream2);
+
+ translator.translate(transform, translationContext);
+ verify(javaStream1).merge(javaStream2, transformName);
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testWithDuplicatedCollections() {
+ String transformName = "transform";
+ when(transform.getName()).thenReturn(transformName);
+
+ JavaStream javaStream1 = mock(JavaStream.class);
+ TranslationContext translationContext = mock(TranslationContext.class);
+
+ PCollection mockCollection1 = mock(PCollection.class);
+ TaggedPValue mockInput1 = mock(TaggedPValue.class);
+ when(mockInput1.getValue()).thenReturn(mockCollection1);
+
+ TaggedPValue mockInput2 = mock(TaggedPValue.class);
+ when(mockInput2.getValue()).thenReturn(mockCollection1);
+
+ when(translationContext.getInputs()).thenReturn(Lists.newArrayList(mockInput1, mockInput2));
+ when(translationContext.getInputStream(mockCollection1)).thenReturn(javaStream1);
+
+ translator.translate(transform, translationContext);
+ verify(javaStream1).map(any(MapFunction.class), eq("dummy"));
+ verify(javaStream1).merge(any(JavaStream.class), eq(transformName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
new file mode 100644
index 0000000..9135022
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator.GearpumpWindowFn;
+import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction;
+import org.apache.gearpump.streaming.dsl.window.impl.Window;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/** Tests for {@link GroupByKeyTranslator}. */
+@RunWith(Parameterized.class)
+public class GroupByKeyTranslatorTest {
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testGearpumpWindowFn() {
+ GearpumpWindowFn windowFn = new GearpumpWindowFn(true);
+ List<BoundedWindow> windows =
+ Lists.newArrayList(
+ new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)),
+ new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(15)));
+
+ WindowFunction.Context<WindowedValue<String>> context =
+ new WindowFunction.Context<WindowedValue<String>>() {
+ @Override
+ public Instant timestamp() {
+ return Instant.EPOCH;
+ }
+
+ @Override
+ public WindowedValue<String> element() {
+ return WindowedValue.of(
+ "v1", new org.joda.time.Instant(6), windows, PaneInfo.NO_FIRING);
+ }
+ };
+
+ Window[] result = windowFn.apply(context);
+ List<Window> expected = Lists.newArrayList();
+ for (BoundedWindow w : windows) {
+ expected.add(TranslatorUtils.boundedWindowToGearpumpWindow(w));
+ }
+ assertThat(result, equalTo(expected.toArray()));
+ }
+
+ @Parameterized.Parameters(name = "{index}: {0}")
+ public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+ return ImmutableList.of(
+ OutputTimeFns.outputAtEarliestInputTimestamp(),
+ OutputTimeFns.outputAtLatestInputTimestamp(),
+ OutputTimeFns.outputAtEndOfWindow());
+ }
+
+ @Parameterized.Parameter(0)
+ public OutputTimeFn<? super BoundedWindow> outputTimeFn;
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testKeyedByTimestamp() {
+ BoundedWindow window =
+ new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
+ GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
+ new GroupByKeyTranslator.KeyedByTimestamp(outputTimeFn);
+ WindowedValue<KV<String, String>> value =
+ WindowedValue.of(
+ KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
+ KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
+ keyedByTimestamp.map(value);
+ org.joda.time.Instant time =
+ outputTimeFn.assignOutputTime(
+ value.getTimestamp(), Iterables.getOnlyElement(value.getWindows()));
+ assertThat(result, equalTo(KV.of(time, value)));
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testMerge() {
+ WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
+ GroupByKeyTranslator.Merge merge = new GroupByKeyTranslator.Merge(slidingWindows, outputTimeFn);
+ org.joda.time.Instant key1 = new org.joda.time.Instant(5);
+ WindowedValue<KV<String, String>> value1 =
+ WindowedValue.of(
+ KV.of("key1", "value1"),
+ key1,
+ new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(10)),
+ PaneInfo.NO_FIRING);
+
+ org.joda.time.Instant key2 = new org.joda.time.Instant(10);
+ WindowedValue<KV<String, String>> value2 =
+ WindowedValue.of(
+ KV.of("key2", "value2"),
+ key2,
+ new IntervalWindow(new org.joda.time.Instant(9), new org.joda.time.Instant(14)),
+ PaneInfo.NO_FIRING);
+
+ KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result1 =
+ merge.fold(KV.of(null, null), KV.of(key1, value1));
+ assertThat(result1.getKey(), equalTo(key1));
+ assertThat(result1.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1")));
+
+ KV<org.joda.time.Instant, WindowedValue<KV<String, List<String>>>> result2 =
+ merge.fold(result1, KV.of(key2, value2));
+ assertThat(result2.getKey(), equalTo(outputTimeFn.combine(key1, key2)));
+ Collection<? extends BoundedWindow> resultWindows = result2.getValue().getWindows();
+ assertThat(resultWindows.size(), equalTo(1));
+ IntervalWindow expectedWindow =
+ new IntervalWindow(new org.joda.time.Instant(5), new org.joda.time.Instant(14));
+ assertThat(resultWindows.toArray()[0], equalTo(expectedWindow));
+ assertThat(
+ result2.getValue().getValue().getValue(), equalTo(Lists.newArrayList("value1", "value2")));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
new file mode 100644
index 0000000..20ee1a2
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadBoundedTranslatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.io.BoundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link ReadBoundedTranslator}. */
+public class ReadBoundedTranslatorTest {
+
+ class BoundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+ @Override
+ public boolean matches(Object o) {
+ return o instanceof BoundedSourceWrapper;
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslate() {
+ ReadBoundedTranslator translator = new ReadBoundedTranslator();
+ GearpumpPipelineOptions options =
+ PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+ Read.Bounded transform = mock(Read.Bounded.class);
+ BoundedSource source = mock(BoundedSource.class);
+ when(transform.getSource()).thenReturn(source);
+
+ TranslationContext translationContext = mock(TranslationContext.class);
+ when(translationContext.getPipelineOptions()).thenReturn(options);
+
+ JavaStream stream = mock(JavaStream.class);
+ PValue mockOutput = mock(PValue.class);
+ when(translationContext.getOutput()).thenReturn(mockOutput);
+ when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
+
+ translator.translate(transform, translationContext);
+ verify(translationContext).getSourceStream(argThat(new BoundedSourceWrapperMatcher()));
+ verify(translationContext).setOutputStream(mockOutput, stream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
new file mode 100644
index 0000000..f27b568
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/ReadUnboundedTranslatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.runners.gearpump.translators.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+/** Tests for {@link ReadUnboundedTranslator}. */
+public class ReadUnboundedTranslatorTest {
+
+ class UnboundedSourceWrapperMatcher extends ArgumentMatcher<DataSource> {
+ @Override
+ public boolean matches(Object o) {
+ return o instanceof UnboundedSourceWrapper;
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testTranslate() {
+ ReadUnboundedTranslator translator = new ReadUnboundedTranslator();
+ GearpumpPipelineOptions options =
+ PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+ Read.Unbounded transform = mock(Read.Unbounded.class);
+ UnboundedSource source = mock(UnboundedSource.class);
+ when(transform.getSource()).thenReturn(source);
+
+ TranslationContext translationContext = mock(TranslationContext.class);
+ when(translationContext.getPipelineOptions()).thenReturn(options);
+
+ JavaStream stream = mock(JavaStream.class);
+ PValue mockOutput = mock(PValue.class);
+ when(translationContext.getOutput()).thenReturn(mockOutput);
+ when(translationContext.getSourceStream(any(DataSource.class))).thenReturn(stream);
+
+ translator.translate(transform, translationContext);
+ verify(translationContext).getSourceStream(argThat(new UnboundedSourceWrapperMatcher()));
+ verify(translationContext).setOutputStream(mockOutput, stream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
new file mode 100644
index 0000000..06ccaaf
--- /dev/null
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/WindowAssignTranslatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.gearpump.translators;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/** Tests for {@link WindowAssignTranslator}. */
+public class WindowAssignTranslatorTest {
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testAssignWindowsWithSlidingWindow() {
+ WindowFn slidingWindows = SlidingWindows.of(Duration.millis(10)).every(Duration.millis(5));
+ WindowAssignTranslator.AssignWindows<String> assignWindows =
+ new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+ String value = "v1";
+ Instant timestamp = new Instant(1);
+ WindowedValue<String> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+ ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+ expected.add(
+ WindowedValue.of(
+ value,
+ timestamp,
+ new IntervalWindow(new Instant(0), new Instant(10)),
+ PaneInfo.NO_FIRING));
+ expected.add(
+ WindowedValue.of(
+ value,
+ timestamp,
+ new IntervalWindow(new Instant(-5), new Instant(5)),
+ PaneInfo.NO_FIRING));
+
+ Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+ assertThat(expected, equalTo(Lists.newArrayList(result)));
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testAssignWindowsWithSessions() {
+ WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
+ WindowAssignTranslator.AssignWindows<String> assignWindows =
+ new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+ String value = "v1";
+ Instant timestamp = new Instant(1);
+ WindowedValue<String> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+ ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+ expected.add(
+ WindowedValue.of(
+ value,
+ timestamp,
+ new IntervalWindow(new Instant(1), new Instant(11)),
+ PaneInfo.NO_FIRING));
+
+ Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+ assertThat(expected, equalTo(Lists.newArrayList(result)));
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testAssignWindowsGlobal() {
+ WindowFn slidingWindows = new GlobalWindows();
+ WindowAssignTranslator.AssignWindows<String> assignWindows =
+ new WindowAssignTranslator.AssignWindows(slidingWindows);
+
+ String value = "v1";
+ Instant timestamp = new Instant(1);
+ WindowedValue<String> windowedValue =
+ WindowedValue.timestampedValueInGlobalWindow(value, timestamp);
+ ArrayList<WindowedValue<String>> expected = new ArrayList<>();
+ expected.add(WindowedValue.timestampedValueInGlobalWindow(value, timestamp));
+
+ Iterator<WindowedValue<String>> result = assignWindows.flatMap(windowedValue);
+ assertThat(expected, equalTo(Lists.newArrayList(result)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
index af5a1d2..b244484 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/GearpumpSourceTest.java
@@ -37,16 +37,14 @@ import org.apache.gearpump.streaming.source.Watermark;
import org.junit.Assert;
import org.junit.Test;
-/**
- * Tests for {@link GearpumpSource}.
- */
+/** Tests for {@link GearpumpSource}. */
public class GearpumpSourceTest {
- private static final List<TimestampedValue<String>> TEST_VALUES = Lists.newArrayList(
- TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
- TimestampedValue.of("b", new org.joda.time.Instant(0)),
- TimestampedValue.of("c", new org.joda.time.Instant(53)),
- TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1))
- );
+ private static final List<TimestampedValue<String>> TEST_VALUES =
+ Lists.newArrayList(
+ TimestampedValue.of("a", new org.joda.time.Instant(Long.MIN_VALUE)),
+ TimestampedValue.of("b", new org.joda.time.Instant(0)),
+ TimestampedValue.of("c", new org.joda.time.Instant(53)),
+ TimestampedValue.of("d", new org.joda.time.Instant(Long.MAX_VALUE - 1)));
private static class SourceForTest<T> extends GearpumpSource<T> {
private ValuesSource<T> valuesSource;
@@ -64,22 +62,24 @@ public class GearpumpSourceTest {
@Test
public void testGearpumpSource() {
- GearpumpPipelineOptions options = PipelineOptionsFactory.create()
- .as(GearpumpPipelineOptions.class);
- ValuesSource<TimestampedValue<String>> valuesSource = new ValuesSource<>(TEST_VALUES,
- TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
+ GearpumpPipelineOptions options =
+ PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
+ ValuesSource<TimestampedValue<String>> valuesSource =
+ new ValuesSource<>(
+ TEST_VALUES, TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of()));
SourceForTest<TimestampedValue<String>> sourceForTest =
- new SourceForTest<>(options, valuesSource);
+ new SourceForTest<>(options, valuesSource);
sourceForTest.open(null, Instant.EPOCH);
- for (TimestampedValue<String> value: TEST_VALUES) {
+ for (TimestampedValue<String> value : TEST_VALUES) {
// Check the watermark first since the Source will advance when it's opened
Instant expectedWaterMark = TranslatorUtils.jodaTimeToJava8Time(value.getTimestamp());
Assert.assertEquals(expectedWaterMark, sourceForTest.getWatermark());
- Message expectedMsg = Message.apply(
- WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
- value.getTimestamp().getMillis());
+ Message expectedMsg =
+ Message.apply(
+ WindowedValue.timestampedValueInGlobalWindow(value, value.getTimestamp()),
+ value.getTimestamp().getMillis());
Message message = sourceForTest.read();
Assert.assertEquals(expectedMsg, message);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f3138dde/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
index 8c50703..439e1b1 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/io/ValueSoureTest.java
@@ -41,18 +41,16 @@ import org.apache.gearpump.util.Constants;
import org.junit.Assert;
import org.junit.Test;
-/**
- * Tests for {@link ValuesSource}.
- */
+/** Tests for {@link ValuesSource}. */
public class ValueSoureTest {
@Test
public void testValueSource() {
- GearpumpPipelineOptions options = PipelineOptionsFactory.create()
- .as(GearpumpPipelineOptions.class);
+ GearpumpPipelineOptions options =
+ PipelineOptionsFactory.create().as(GearpumpPipelineOptions.class);
Config config = ClusterConfig.master(null);
- config = config.withValue(Constants.APPLICATION_TOTAL_RETRIES(),
- ConfigValueFactory.fromAnyRef(0));
+ config =
+ config.withValue(Constants.APPLICATION_TOTAL_RETRIES(), ConfigValueFactory.fromAnyRef(0));
EmbeddedCluster cluster = new EmbeddedCluster(config);
cluster.start();
@@ -62,8 +60,7 @@ public class ValueSoureTest {
Pipeline p = Pipeline.create(options);
List<String> values = Lists.newArrayList("1", "2", "3", "4", "5");
ValuesSource<String> source = new ValuesSource<>(values, StringUtf8Coder.of());
- p.apply(Read.from(source))
- .apply(ParDo.of(new ResultCollector()));
+ p.apply(Read.from(source)).apply(ParDo.of(new ResultCollector()));
p.run().waitUntilFinish();
cluster.stop();