You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:17 UTC
[17/53] [abbrv] beam git commit: jstorm-runner: remove
ValidatesRunner tests and dead code from jstorm module.
jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ff42cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ff42cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ff42cbc
Branch: refs/heads/jstorm-runner
Commit: 4ff42cbc65452ae6259d90f07f2f80423eeb69df
Parents: aa251a4
Author: Pei He <pe...@apache.org>
Authored: Thu Jul 13 18:38:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:56 2017 +0800
----------------------------------------------------------------------
.../jstorm/translation/TranslatorRegistry.java | 18 -
.../translator/CombineGloballyTranslator.java | 25 -
.../translator/CombinePerKeyTranslator.java | 25 -
.../translator/ReshuffleTranslator.java | 24 -
.../translator/WindowBoundTranslator.java | 47 --
.../util/DefaultSideInputReader.java | 45 --
.../translator/CoGroupByKeyTest.java | 301 ---------
.../translation/translator/GroupByKeyTest.java | 155 -----
.../translation/translator/ParDoTest.java | 624 -------------------
9 files changed, 1264 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index bce5b3e..316186e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -49,30 +49,12 @@ public class TranslatorRegistry {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
- // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
- // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
-
- //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
-
TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
-
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-
TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
-
- /**
- * Currently, empty translation is required for combine and reshuffle.
- * Because, the transforms will be mapped to GroupByKey and Pardo finally.
- * So we only need to translator the finally transforms.
- * If any improvement is required, the composite transforms will be translated in the future.
- */
- // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
- // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
- // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
}
public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
deleted file mode 100644
index fe5fca9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombineGloballyTranslator<InputT, OutputT>
- extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
deleted file mode 100644
index c382fb7..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombinePerKeyTranslator<K, InputT, OutputT>
- extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
deleted file mode 100644
index c450a22..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Reshuffle;
-
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> {
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
deleted file mode 100644
index c863c9e..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a Window.Bound node into a Storm WindowedBolt
- *
- * @param <T>
- */
-public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
- private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
-
- // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
- @Override
- public void translateNode(Window.Assign<T> transform, TranslationContext context) {
- if (transform.getWindowFn() instanceof FixedWindows) {
- context.getUserGraphContext().setWindowed();
- } else if (transform.getWindowFn() instanceof SlidingWindows) {
- context.getUserGraphContext().setWindowed();
- } else {
- throw new UnsupportedOperationException(
- "Not supported window type currently: " + transform.getWindowFn());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
deleted file mode 100644
index 750095e..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-import java.io.Serializable;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * No-op SideInputReader implementation.
- */
-public class DefaultSideInputReader implements SideInputReader, Serializable {
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
- return null;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> pCollectionView) {
- return false;
- }
-
- @Override
- public boolean isEmpty() {
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
deleted file mode 100644
index 809436e..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-@RunWith(JUnit4.class)
-public class CoGroupByKeyTest implements Serializable {
- /**
- * Converts the given list into a PCollection belonging to the provided
- * Pipeline in such a way that coder inference needs to be performed.
- */
- private PCollection<KV<Integer, String>> createInput(String name,
- Pipeline p, List<KV<Integer, String>> list) {
- return createInput(name, p, list, new ArrayList<Long>());
- }
-
- /**
- * Converts the given list with timestamps into a PCollection.
- */
- private PCollection<KV<Integer, String>> createInput(String name,
- Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
- PCollection<KV<Integer, String>> input;
- if (timestamps.isEmpty()) {
- input = p.apply("Create" + name, Create.of(list)
- .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
- } else {
- input = p.apply("Create" + name, Create.timestamped(list, timestamps)
- .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
- }
- return input.apply(
- "Identity" + name,
- ParDo.of(
- new DoFn<KV<Integer, String>, KV<Integer, String>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }));
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result
- * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
- * where each {@link PCollection} has no duplicate keys and the key sets of
- * each {@link PCollection} are intersecting but neither is a subset of the other.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
- Pipeline p,
- TupleTag<String> tag1,
- TupleTag<String> tag2) {
- List<KV<Integer, String>> list1 =
- Arrays.asList(
- KV.of(1, "collection1-1"),
- KV.of(2, "collection1-2"));
- List<KV<Integer, String>> list2 =
- Arrays.asList(
- KV.of(2, "collection2-2"),
- KV.of(3, "collection2-3"));
- PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
- PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(tag1, collection1)
- .and(tag2, collection2)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testCoGroupByKeyGetOnly() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- final TupleTag<String> tag1 = new TupleTag<>();
- final TupleTag<String> tag2 = new TupleTag<>();
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- buildGetOnlyGbk(p, tag1, tag2);
-
- PAssert.thatMap(coGbkResults).satisfies(
- new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
- @Override
- public Void apply(Map<Integer, CoGbkResult> results) {
- assertEquals("collection1-1", results.get(1).getOnly(tag1));
- assertEquals("collection1-2", results.get(2).getOnly(tag1));
- assertEquals("collection2-2", results.get(2).getOnly(tag2));
- assertEquals("collection2-3", results.get(3).getOnly(tag2));
- return null;
- }
- });
-
- p.run();
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
- * results of the {@code CoGroupByKey} over three
- * {@code PCollection<KV<Integer, String>>}, each of which correlates
- * a customer id to purchases, addresses, or names, respectively.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
- Pipeline p,
- TupleTag<String> purchasesTag,
- TupleTag<String> addressesTag,
- TupleTag<String> namesTag) {
- List<KV<Integer, String>> idToPurchases =
- Arrays.asList(
- KV.of(2, "Boat"),
- KV.of(1, "Shoes"),
- KV.of(3, "Car"),
- KV.of(1, "Book"),
- KV.of(10, "Pens"),
- KV.of(8, "House"),
- KV.of(4, "Suit"),
- KV.of(11, "House"),
- KV.of(14, "Shoes"),
- KV.of(2, "Suit"),
- KV.of(8, "Suit Case"),
- KV.of(3, "House"));
-
- List<KV<Integer, String>> idToAddress =
- Arrays.asList(
- KV.of(2, "53 S. 3rd"),
- KV.of(10, "383 Jackson Street"),
- KV.of(20, "3 W. Arizona"),
- KV.of(3, "29 School Rd"),
- KV.of(8, "6 Watling Rd"));
-
- List<KV<Integer, String>> idToName =
- Arrays.asList(
- KV.of(1, "John Smith"),
- KV.of(2, "Sally James"),
- KV.of(8, "Jeffery Spalding"),
- KV.of(20, "Joan Lichtfield"));
-
- PCollection<KV<Integer, String>> purchasesTable =
- createInput("CreateIdToPurchases", p, idToPurchases);
-
- PCollection<KV<Integer, String>> addressTable =
- createInput("CreateIdToAddress", p, idToAddress);
-
- PCollection<KV<Integer, String>> nameTable =
- createInput("CreateIdToName", p, idToName);
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(namesTag, nameTable)
- .and(addressesTag, addressTable)
- .and(purchasesTag, purchasesTable)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- /**
- * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
- * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
- * each of which correlates a customer id to clicks, purchases, respectively.
- */
- private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(
- Pipeline p,
- TupleTag<String> clicksTag,
- TupleTag<String> purchasesTag) {
- List<KV<Integer, String>> idToClick =
- Arrays.asList(
- KV.of(1, "Click t0"),
- KV.of(2, "Click t2"),
- KV.of(1, "Click t4"),
- KV.of(1, "Click t6"),
- KV.of(2, "Click t8"));
-
- List<KV<Integer, String>> idToPurchases =
- Arrays.asList(
- KV.of(1, "Boat t1"),
- KV.of(1, "Shoesi t2"),
- KV.of(1, "Pens t3"),
- KV.of(2, "House t4"),
- KV.of(2, "Suit t5"),
- KV.of(1, "Car t6"),
- KV.of(1, "Book t7"),
- KV.of(2, "House t8"),
- KV.of(2, "Shoes t9"),
- KV.of(2, "House t10"));
-
- PCollection<KV<Integer, String>> clicksTable =
- createInput("CreateClicks",
- p,
- idToClick,
- Arrays.asList(0L, 2L, 4L, 6L, 8L))
- .apply("WindowClicks", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
- PCollection<KV<Integer, String>> purchasesTable =
- createInput("CreatePurchases",
- p,
- idToPurchases,
- Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
- .apply("WindowPurchases", Window.<KV<Integer, String>>into(
- FixedWindows.of(new Duration(4)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- KeyedPCollectionTuple.of(clicksTag, clicksTable)
- .and(purchasesTag, purchasesTable)
- .apply(CoGroupByKey.<Integer>create());
- return coGbkResults;
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testCoGroupByKey() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- final TupleTag<String> namesTag = new TupleTag<>();
- final TupleTag<String> addressesTag = new TupleTag<>();
- final TupleTag<String> purchasesTag = new TupleTag<>();
-
-
- PCollection<KV<Integer, CoGbkResult>> coGbkResults =
- buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
-
- PAssert.thatMap(coGbkResults).satisfies(
- new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
- @Override
- public Void apply(Map<Integer, CoGbkResult> results) {
- CoGbkResult result1 = results.get(1);
- assertEquals("John Smith", result1.getOnly(namesTag));
- assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book"));
-
- CoGbkResult result2 = results.get(2);
- assertEquals("Sally James", result2.getOnly(namesTag));
- assertEquals("53 S. 3rd", result2.getOnly(addressesTag));
- assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat"));
-
- CoGbkResult result3 = results.get(3);
- assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd");
- assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House"));
-
- CoGbkResult result8 = results.get(8);
- assertEquals("Jeffery Spalding", result8.getOnly(namesTag));
- assertEquals("6 Watling Rd", result8.getOnly(addressesTag));
- assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case"));
-
- CoGbkResult result20 = results.get(20);
- assertEquals("Joan Lichtfield", result20.getOnly(namesTag));
- assertEquals("3 W. Arizona", result20.getOnly(addressesTag));
-
- assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag));
-
- assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit"));
- assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens"));
- assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House"));
- assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes"));
-
- return null;
- }
- });
-
- p.run();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
deleted file mode 100644
index 9a8b43a..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link GroupByKey} with {@link StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class GroupByKeyTest {
-
- static final String[] WORDS_ARRAY = new String[] {
- "hi", "there", "hi", "hi", "sue", "bob",
- "hi", "sue", "", "", "ZOW", "bob", "" };
-
- static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
- @Test
- public void testGroupByKey() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
-
- List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
- KV.of("k1", 3),
- KV.of("k5", Integer.MAX_VALUE),
- KV.of("k5", Integer.MIN_VALUE),
- KV.of("k2", 66),
- KV.of("k1", 4),
- KV.of("k2", -33),
- KV.of("k3", 0));
-
- PCollection<KV<String, Integer>> input =
- p.apply(Create.of(ungroupedPairs)
- .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
- PCollection<KV<String, Iterable<Integer>>> output =
- input.apply(GroupByKey.<String, Integer>create());
-
- PAssert.that(output)
- .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
-
- p.run();
- }
-
- @Test
- public void testCountGloballyBasic() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- options.setLocalMode(true);
-
- Pipeline p = Pipeline.create(options);
- PCollection<String> input = p.apply(Create.of(WORDS));
-
- PCollection<Long> output =
- input.apply(Count.<String>globally());
-
- PAssert.that(output)
- .containsInAnyOrder(13L);
- p.run();
- }
-
- static class AssertThatHasExpectedContentsForTestGroupByKey
- implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
- Void> {
- @Override
- public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
- assertThat(actual, containsInAnyOrder(
- KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)),
- KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
- Integer.MIN_VALUE)),
- KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)),
- KvMatcher.isKv(is("k3"), containsInAnyOrder(0))));
- return null;
- }
- }
-
- /**
- * Matcher for KVs.
- */
- public static class KvMatcher<K, V>
- extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
- final Matcher<? super K> keyMatcher;
- final Matcher<? super V> valueMatcher;
-
- public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
- Matcher<V> valueMatcher) {
- return new KvMatcher<>(keyMatcher, valueMatcher);
- }
-
- public KvMatcher(Matcher<? super K> keyMatcher,
- Matcher<? super V> valueMatcher) {
- this.keyMatcher = keyMatcher;
- this.valueMatcher = valueMatcher;
- }
-
- @Override
- public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
- return keyMatcher.matches(kv.getKey())
- && valueMatcher.matches(kv.getValue());
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a KV(").appendValue(keyMatcher)
- .appendText(", ").appendValue(valueMatcher)
- .appendText(")");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
deleted file mode 100644
index c911364..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import com.google.common.base.MoreObjects;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.*;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.UsesMapState;
-import org.apache.beam.sdk.testing.UsesStatefulParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.values.*;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link ParDo} with {@link StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class ParDoTest implements Serializable {
-
- @Test
- public void testParDo() throws IOException {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- PCollection<String> output = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn()));
-
- PAssert.that(output)
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
- pipeline.run();
- }
-
- @Test
- public void testParDoWithSideInputs() throws IOException {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- PCollectionView<Integer> sideInput1 = pipeline
- .apply("CreateSideInput1", Create.of(11))
- .apply("ViewSideInput1", View.<Integer>asSingleton());
- PCollectionView<Integer> sideInputUnread = pipeline
- .apply("CreateSideInputUnread", Create.of(-3333))
- .apply("ViewSideInputUnread", View.<Integer>asSingleton());
-
- PCollectionView<Integer> sideInput2 = pipeline
- .apply("CreateSideInput2", Create.of(222))
- .apply("ViewSideInput2", View.<Integer>asSingleton());
- PCollection<String> output = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn(
- Arrays.asList(sideInput1, sideInput2),
- Arrays.<TupleTag<String>>asList()))
- .withSideInputs(sideInput1, sideInputUnread, sideInput2));
-
- PAssert.that(output)
- .satisfies(ParDoTest.HasExpectedOutput
- .forInput(inputs)
- .andSideInputs(11, 222));
-
- pipeline.run();
- }
-
- @Test
- public void testParDoWithTaggedOutput() {
- List<Integer> inputs = Arrays.asList(3, -42, 666);
-
- TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
- TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
- TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
- TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
- TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
-
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- PCollectionTuple outputs = pipeline
- .apply(Create.of(inputs))
- .apply(ParDo
- .of(new TestDoFn(
- Arrays.<PCollectionView<Integer>>asList(),
- Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
- .withOutputTags(
- mainOutputTag,
- TupleTagList.of(additionalOutputTag3)
- .and(additionalOutputTag1)
- .and(additionalOutputTagUnwritten)
- .and(additionalOutputTag2)));
-
- PAssert.that(outputs.get(mainOutputTag))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
- PAssert.that(outputs.get(additionalOutputTag1))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag1));
- PAssert.that(outputs.get(additionalOutputTag2))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag2));
- PAssert.that(outputs.get(additionalOutputTag3))
- .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
- .fromOutput(additionalOutputTag3));
- PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
-
- pipeline.run();
- }
-
- @Test
- public void testNoWindowFnDoesNotReassignWindows() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final PCollection<Long> initialWindows =
- pipeline
- .apply(GenerateSequence.from(0).to(10))
- .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
-
- // Sanity check the window assignment to demonstrate the baseline
- PAssert.that(initialWindows)
- .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
- .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
- PAssert.that(initialWindows)
- .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
- .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
-
- PCollection<Boolean> upOne =
- initialWindows.apply(
- "ModifyTypes",
- MapElements.<Long, Boolean>via(
- new SimpleFunction<Long, Boolean>() {
- @Override
- public Boolean apply(Long input) {
- return input % 2 == 0;
- }
- }));
- PAssert.that(upOne)
- .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
- .containsInAnyOrder(true, true, true, true, true);
- PAssert.that(upOne)
- .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
- .containsInAnyOrder(false, false, false, false, false);
-
- // The elements should be in the same windows, even though they would not be assigned to the
- // same windows with the updated timestamps. If we try to apply the original WindowFn, the type
- // will not be appropriate and the runner should crash, as a Boolean cannot be converted into
- // a long.
- PCollection<Boolean> updatedTrigger =
- upOne.apply(
- "UpdateWindowingStrategy",
- Window.<Boolean>configure().triggering(Never.ever())
- .withAllowedLateness(Duration.ZERO)
- .accumulatingFiredPanes());
- pipeline.run();
- }
-
- @Test
- public void testValueStateSameId() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
-
- DoFn<KV<String, Integer>, KV<String, Integer>> fn =
- new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
- c.output(KV.of("sizzle", currentValue));
- state.write(currentValue + 1);
- }
- };
-
- DoFn<KV<String, Integer>, Integer> fn2 =
- new DoFn<KV<String, Integer>, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
- c.output(currentValue);
- state.write(currentValue + 13);
- }
- };
-
- PCollection<KV<String, Integer>> intermediate =
- pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
- .apply("First stateful ParDo", ParDo.of(fn));
-
- PCollection<Integer> output =
- intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
-
- PAssert.that(intermediate)
- .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2));
- PAssert.that(output).containsInAnyOrder(13, 26, 39);
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class})
- public void testValueStateTaggedOutput() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
-
- final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
- final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
-
- DoFn<KV<String, Integer>, Integer> fn =
- new DoFn<KV<String, Integer>, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
- Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
- if (currentValue % 2 == 0) {
- c.output(currentValue);
- } else {
- c.output(oddTag, currentValue);
- }
- state.write(currentValue + 1);
- }
- };
-
- PCollectionTuple output =
- pipeline.apply(
- Create.of(
- KV.of("hello", 42),
- KV.of("hello", 97),
- KV.of("hello", 84),
- KV.of("goodbye", 33),
- KV.of("hello", 859),
- KV.of("goodbye", 83945)))
- .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
-
- PCollection<Integer> evens = output.get(evenTag);
- PCollection<Integer> odds = output.get(oddTag);
-
- // There are 0 and 2 from "hello" and just 0 from "goodbye"
- PAssert.that(evens).containsInAnyOrder(0, 2, 0);
-
- // There are 1 and 3 from "hello" and just "1" from "goodbye"
- PAssert.that(odds).containsInAnyOrder(1, 3, 1);
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
- public void testMapStateCoderInference() {
- StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
- options.setRunner(TestJStormRunner.class);
- Pipeline pipeline = Pipeline.create(options);
-
- final String stateId = "foo";
- final String countStateId = "count";
- Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
- pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
-
- DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
- new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
-
- @StateId(stateId)
- private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
-
- @StateId(countStateId)
- private final StateSpec<CombiningState<Integer, int[], Integer>>
- countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
- Sum.ofIntegers());
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
- @StateId(countStateId) CombiningState<Integer, int[], Integer>
- count) {
- KV<String, Integer> value = c.element().getValue();
- state.put(value.getKey(), new MyInteger(value.getValue()));
- count.add(1);
- if (count.read() >= 4) {
- Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
- for (Map.Entry<String, MyInteger> entry : iterate) {
- c.output(KV.of(entry.getKey(), entry.getValue()));
- }
- }
- }
- };
-
- PCollection<KV<String, MyInteger>> output =
- pipeline.apply(
- Create.of(
- KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)),
- KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12))))
- .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder));
-
- PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)),
- KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12)));
- pipeline.run();
- }
-
-
- private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
- private static final IntervalWindow EVEN_WINDOW =
- new IntervalWindow(
- BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
- private static final IntervalWindow ODD_WINDOW =
- new IntervalWindow(
- BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1));
-
- @Override
- public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
- if (c.element() % 2 == 0) {
- return Collections.singleton(EVEN_WINDOW);
- }
- return Collections.singleton(ODD_WINDOW);
- }
-
- @Override
- public boolean isCompatible(WindowFn<?, ?> other) {
- return other instanceof WindowOddEvenBuckets;
- }
-
- @Override
- public Coder<IntervalWindow> windowCoder() {
- return new IntervalWindow.IntervalWindowCoder();
- }
-
- @Override
- public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
- throw new UnsupportedOperationException(
- String.format("Can't use %s for side inputs", getClass().getSimpleName()));
- }
- }
-
-
- static class TestDoFn extends DoFn<Integer, String> {
- enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED}
-
- State state = State.NOT_SET_UP;
-
- final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
- final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
-
- public TestDoFn() {
- }
-
- public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
- List<TupleTag<String>> additionalOutputTupleTags) {
- this.sideInputViews.addAll(sideInputViews);
- this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
- }
-
- @Setup
- public void prepare() {
- assertEquals(State.NOT_SET_UP, state);
- state = State.UNSTARTED;
- }
-
- @StartBundle
- public void startBundle() {
- assertThat(state,
- anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
-
- state = State.STARTED;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- System.out.println("Recv elem: " + c.element());
- assertThat(state,
- anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
- state = State.PROCESSING;
- outputToAllWithSideInputs(c, "processing: " + c.element());
- }
-
- @FinishBundle
- public void finishBundle(FinishBundleContext c) {
- assertThat(state,
- anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
- state = State.FINISHED;
- c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
- for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
- c.output(
- additionalOutputTupleTag,
- additionalOutputTupleTag.getId() + ": " + "finished",
- BoundedWindow.TIMESTAMP_MIN_VALUE,
- GlobalWindow.INSTANCE);
- }
- }
-
- private void outputToAllWithSideInputs(ProcessContext c, String value) {
- if (!sideInputViews.isEmpty()) {
- List<Integer> sideInputValues = new ArrayList<>();
- for (PCollectionView<Integer> sideInputView : sideInputViews) {
- sideInputValues.add(c.sideInput(sideInputView));
- }
- value += ": " + sideInputValues;
- }
- c.output(value);
- for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
- c.output(additionalOutputTupleTag,
- additionalOutputTupleTag.getId() + ": " + value);
- }
- }
- }
-
- private static class MyInteger implements Comparable<MyInteger> {
- private final int value;
-
- MyInteger(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (!(o instanceof MyInteger)) {
- return false;
- }
-
- MyInteger myInteger = (MyInteger) o;
-
- return value == myInteger.value;
-
- }
-
- @Override
- public int hashCode() {
- return value;
- }
-
- @Override
- public int compareTo(MyInteger o) {
- return Integer.compare(this.getValue(), o.getValue());
- }
-
- @Override
- public String toString() {
- return "MyInteger{" + "value=" + value + '}';
- }
- }
-
- private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
- private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
-
- private final VarIntCoder delegate = VarIntCoder.of();
-
- public static MyIntegerCoder of() {
- return INSTANCE;
- }
-
- @Override
- public void encode(MyInteger value, OutputStream outStream)
- throws CoderException, IOException {
- delegate.encode(value.getValue(), outStream);
- }
-
- @Override
- public MyInteger decode(InputStream inStream) throws CoderException,
- IOException {
- return new MyInteger(delegate.decode(inStream));
- }
- }
-
- /** PAssert "matcher" for expected output. */
- static class HasExpectedOutput
- implements SerializableFunction<Iterable<String>, Void>, Serializable {
- private final List<Integer> inputs;
- private final List<Integer> sideInputs;
- private final String additionalOutput;
- private final boolean ordered;
-
- public static HasExpectedOutput forInput(List<Integer> inputs) {
- return new HasExpectedOutput(
- new ArrayList<Integer>(inputs),
- new ArrayList<Integer>(),
- "",
- false);
- }
-
- private HasExpectedOutput(List<Integer> inputs,
- List<Integer> sideInputs,
- String additionalOutput,
- boolean ordered) {
- this.inputs = inputs;
- this.sideInputs = sideInputs;
- this.additionalOutput = additionalOutput;
- this.ordered = ordered;
- }
-
- public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
- List<Integer> sideInputs = new ArrayList<>();
- for (Integer sideInputValue : sideInputValues) {
- sideInputs.add(sideInputValue);
- }
- return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered);
- }
-
- public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
- return fromOutput(outputTag.getId());
- }
- public HasExpectedOutput fromOutput(String outputId) {
- return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
- }
-
- public HasExpectedOutput inOrder() {
- return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
- }
-
- @Override
- public Void apply(Iterable<String> outputs) {
- List<String> processeds = new ArrayList<>();
- List<String> finisheds = new ArrayList<>();
- for (String output : outputs) {
- if (output.contains("finished")) {
- finisheds.add(output);
- } else {
- processeds.add(output);
- }
- }
-
- String sideInputsSuffix;
- if (sideInputs.isEmpty()) {
- sideInputsSuffix = "";
- } else {
- sideInputsSuffix = ": " + sideInputs;
- }
-
- String additionalOutputPrefix;
- if (additionalOutput.isEmpty()) {
- additionalOutputPrefix = "";
- } else {
- additionalOutputPrefix = additionalOutput + ": ";
- }
-
- List<String> expectedProcesseds = new ArrayList<>();
- for (Integer input : inputs) {
- expectedProcesseds.add(
- additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
- }
- String[] expectedProcessedsArray =
- expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
- if (!ordered || expectedProcesseds.isEmpty()) {
- assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
- } else {
- assertThat(processeds, contains(expectedProcessedsArray));
- }
-
- for (String finished : finisheds) {
- assertEquals(additionalOutputPrefix + "finished", finished);
- }
-
- return null;
- }
- }
-}