You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:17 UTC

[17/53] [abbrv] beam git commit: jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module.

jstorm-runner: remove ValidatesRunner tests and dead code from jstorm module.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4ff42cbc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4ff42cbc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4ff42cbc

Branch: refs/heads/jstorm-runner
Commit: 4ff42cbc65452ae6259d90f07f2f80423eeb69df
Parents: aa251a4
Author: Pei He <pe...@apache.org>
Authored: Thu Jul 13 18:38:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:56 2017 +0800

----------------------------------------------------------------------
 .../jstorm/translation/TranslatorRegistry.java  |  18 -
 .../translator/CombineGloballyTranslator.java   |  25 -
 .../translator/CombinePerKeyTranslator.java     |  25 -
 .../translator/ReshuffleTranslator.java         |  24 -
 .../translator/WindowBoundTranslator.java       |  47 --
 .../util/DefaultSideInputReader.java            |  45 --
 .../translator/CoGroupByKeyTest.java            | 301 ---------
 .../translation/translator/GroupByKeyTest.java  | 155 -----
 .../translation/translator/ParDoTest.java       | 624 -------------------
 9 files changed, 1264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index bce5b3e..316186e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -49,30 +49,12 @@ public class TranslatorRegistry {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
-    // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
-    // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
-
     TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
     TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
-
-    //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
     TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
-
     TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
-
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-
     TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
-
-    /**
-     * Currently, empty translation is required for combine and reshuffle.
-     * Because, the transforms will be mapped to GroupByKey and Pardo finally.
-     * So we only need to translator the finally transforms.
-     * If any improvement is required, the composite transforms will be translated in the future.
-     */
-    // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-    // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
-    // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
   }
 
   public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
deleted file mode 100644
index fe5fca9..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombineGloballyTranslator<InputT, OutputT>
-    extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
deleted file mode 100644
index c382fb7..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Combine;
-
-public class CombinePerKeyTranslator<K, InputT, OutputT>
-    extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
deleted file mode 100644
index c450a22..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.sdk.transforms.Reshuffle;
-
-public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K, V>> {
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
deleted file mode 100644
index c863c9e..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a Window.Bound node into a Storm WindowedBolt
- *
- * @param <T>
- */
-public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
-  private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
-
-  // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
-  @Override
-  public void translateNode(Window.Assign<T> transform, TranslationContext context) {
-    if (transform.getWindowFn() instanceof FixedWindows) {
-      context.getUserGraphContext().setWindowed();
-    } else if (transform.getWindowFn() instanceof SlidingWindows) {
-      context.getUserGraphContext().setWindowed();
-    } else {
-      throw new UnsupportedOperationException(
-          "Not supported window type currently: " + transform.getWindowFn());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
deleted file mode 100644
index 750095e..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.util;
-
-import java.io.Serializable;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollectionView;
-
-/**
- * No-op SideInputReader implementation.
- */
-public class DefaultSideInputReader implements SideInputReader, Serializable {
-  @Nullable
-  @Override
-  public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
-    return null;
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> pCollectionView) {
-    return false;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
deleted file mode 100644
index 809436e..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/CoGroupByKeyTest.java
+++ /dev/null
@@ -1,301 +0,0 @@
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-@RunWith(JUnit4.class)
-public class CoGroupByKeyTest implements Serializable {
-    /**
-     * Converts the given list into a PCollection belonging to the provided
-     * Pipeline in such a way that coder inference needs to be performed.
-     */
-    private PCollection<KV<Integer, String>> createInput(String name,
-                                                         Pipeline p, List<KV<Integer, String>> list) {
-        return createInput(name, p, list,  new ArrayList<Long>());
-    }
-
-    /**
-     * Converts the given list with timestamps into a PCollection.
-     */
-    private PCollection<KV<Integer, String>> createInput(String name,
-                                                         Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
-        PCollection<KV<Integer, String>> input;
-        if (timestamps.isEmpty()) {
-            input = p.apply("Create" + name, Create.of(list)
-                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
-        } else {
-            input = p.apply("Create" + name, Create.timestamped(list, timestamps)
-                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
-        }
-        return input.apply(
-                "Identity" + name,
-                ParDo.of(
-                        new DoFn<KV<Integer, String>, KV<Integer, String>>() {
-                            @ProcessElement
-                            public void processElement(ProcessContext c) {
-                                c.output(c.element());
-                            }
-                        }));
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result
-     * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
-     * where each {@link PCollection} has no duplicate keys and the key sets of
-     * each {@link PCollection} are intersecting but neither is a subset of the other.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
-            Pipeline p,
-            TupleTag<String> tag1,
-            TupleTag<String> tag2) {
-        List<KV<Integer, String>> list1 =
-                Arrays.asList(
-                        KV.of(1, "collection1-1"),
-                        KV.of(2, "collection1-2"));
-        List<KV<Integer, String>> list2 =
-                Arrays.asList(
-                        KV.of(2, "collection2-2"),
-                        KV.of(3, "collection2-3"));
-        PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
-        PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(tag1, collection1)
-                        .and(tag2, collection2)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testCoGroupByKeyGetOnly() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        final TupleTag<String> tag1 = new TupleTag<>();
-        final TupleTag<String> tag2 = new TupleTag<>();
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                buildGetOnlyGbk(p, tag1, tag2);
-
-        PAssert.thatMap(coGbkResults).satisfies(
-                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
-                    @Override
-                    public Void apply(Map<Integer, CoGbkResult> results) {
-                        assertEquals("collection1-1", results.get(1).getOnly(tag1));
-                        assertEquals("collection1-2", results.get(2).getOnly(tag1));
-                        assertEquals("collection2-2", results.get(2).getOnly(tag2));
-                        assertEquals("collection2-3", results.get(3).getOnly(tag2));
-                        return null;
-                    }
-                });
-
-        p.run();
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
-     * results of the {@code CoGroupByKey} over three
-     * {@code PCollection<KV<Integer, String>>}, each of which correlates
-     * a customer id to purchases, addresses, or names, respectively.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
-            Pipeline p,
-            TupleTag<String> purchasesTag,
-            TupleTag<String> addressesTag,
-            TupleTag<String> namesTag) {
-        List<KV<Integer, String>> idToPurchases =
-                Arrays.asList(
-                        KV.of(2, "Boat"),
-                        KV.of(1, "Shoes"),
-                        KV.of(3, "Car"),
-                        KV.of(1, "Book"),
-                        KV.of(10, "Pens"),
-                        KV.of(8, "House"),
-                        KV.of(4, "Suit"),
-                        KV.of(11, "House"),
-                        KV.of(14, "Shoes"),
-                        KV.of(2, "Suit"),
-                        KV.of(8, "Suit Case"),
-                        KV.of(3, "House"));
-
-        List<KV<Integer, String>> idToAddress =
-                Arrays.asList(
-                        KV.of(2, "53 S. 3rd"),
-                        KV.of(10, "383 Jackson Street"),
-                        KV.of(20, "3 W. Arizona"),
-                        KV.of(3, "29 School Rd"),
-                        KV.of(8, "6 Watling Rd"));
-
-        List<KV<Integer, String>> idToName =
-                Arrays.asList(
-                        KV.of(1, "John Smith"),
-                        KV.of(2, "Sally James"),
-                        KV.of(8, "Jeffery Spalding"),
-                        KV.of(20, "Joan Lichtfield"));
-
-        PCollection<KV<Integer, String>> purchasesTable =
-                createInput("CreateIdToPurchases", p, idToPurchases);
-
-        PCollection<KV<Integer, String>> addressTable =
-                createInput("CreateIdToAddress", p, idToAddress);
-
-        PCollection<KV<Integer, String>> nameTable =
-                createInput("CreateIdToName", p, idToName);
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(namesTag, nameTable)
-                        .and(addressesTag, addressTable)
-                        .and(purchasesTag, purchasesTable)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
-     * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
-     * each of which correlates a customer id to clicks, purchases, respectively.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(
-            Pipeline p,
-            TupleTag<String> clicksTag,
-            TupleTag<String> purchasesTag) {
-        List<KV<Integer, String>> idToClick =
-                Arrays.asList(
-                        KV.of(1, "Click t0"),
-                        KV.of(2, "Click t2"),
-                        KV.of(1, "Click t4"),
-                        KV.of(1, "Click t6"),
-                        KV.of(2, "Click t8"));
-
-        List<KV<Integer, String>> idToPurchases =
-                Arrays.asList(
-                        KV.of(1, "Boat t1"),
-                        KV.of(1, "Shoesi t2"),
-                        KV.of(1, "Pens t3"),
-                        KV.of(2, "House t4"),
-                        KV.of(2, "Suit t5"),
-                        KV.of(1, "Car t6"),
-                        KV.of(1, "Book t7"),
-                        KV.of(2, "House t8"),
-                        KV.of(2, "Shoes t9"),
-                        KV.of(2, "House t10"));
-
-        PCollection<KV<Integer, String>> clicksTable =
-                createInput("CreateClicks",
-                        p,
-                        idToClick,
-                        Arrays.asList(0L, 2L, 4L, 6L, 8L))
-                        .apply("WindowClicks", Window.<KV<Integer, String>>into(
-                                FixedWindows.of(new Duration(4)))
-                                .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
-        PCollection<KV<Integer, String>> purchasesTable =
-                createInput("CreatePurchases",
-                        p,
-                        idToPurchases,
-                        Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
-                        .apply("WindowPurchases", Window.<KV<Integer, String>>into(
-                                FixedWindows.of(new Duration(4)))
-                                .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(clicksTag, clicksTable)
-                        .and(purchasesTag, purchasesTable)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testCoGroupByKey() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        final TupleTag<String> namesTag = new TupleTag<>();
-        final TupleTag<String> addressesTag = new TupleTag<>();
-        final TupleTag<String> purchasesTag = new TupleTag<>();
-
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
-
-        PAssert.thatMap(coGbkResults).satisfies(
-                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
-                    @Override
-                    public Void apply(Map<Integer, CoGbkResult> results) {
-                        CoGbkResult result1 = results.get(1);
-                        assertEquals("John Smith", result1.getOnly(namesTag));
-                        assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book"));
-
-                        CoGbkResult result2 = results.get(2);
-                        assertEquals("Sally James", result2.getOnly(namesTag));
-                        assertEquals("53 S. 3rd", result2.getOnly(addressesTag));
-                        assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat"));
-
-                        CoGbkResult result3 = results.get(3);
-                        assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd");
-                        assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House"));
-
-                        CoGbkResult result8 = results.get(8);
-                        assertEquals("Jeffery Spalding", result8.getOnly(namesTag));
-                        assertEquals("6 Watling Rd", result8.getOnly(addressesTag));
-                        assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case"));
-
-                        CoGbkResult result20 = results.get(20);
-                        assertEquals("Joan Lichtfield", result20.getOnly(namesTag));
-                        assertEquals("3 W. Arizona", result20.getOnly(addressesTag));
-
-                        assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag));
-
-                        assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit"));
-                        assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens"));
-                        assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House"));
-                        assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes"));
-
-                        return null;
-                    }
-                });
-
-        p.run();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
deleted file mode 100644
index 9a8b43a..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link GroupByKey} with {@link StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class GroupByKeyTest {
-
-    static final String[] WORDS_ARRAY = new String[] {
-            "hi", "there", "hi", "hi", "sue", "bob",
-            "hi", "sue", "", "", "ZOW", "bob", "" };
-
-    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
-    @Test
-    public void testGroupByKey() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
-                KV.of("k1", 3),
-                KV.of("k5", Integer.MAX_VALUE),
-                KV.of("k5", Integer.MIN_VALUE),
-                KV.of("k2", 66),
-                KV.of("k1", 4),
-                KV.of("k2", -33),
-                KV.of("k3", 0));
-
-        PCollection<KV<String, Integer>> input =
-                p.apply(Create.of(ungroupedPairs)
-                        .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
-        PCollection<KV<String, Iterable<Integer>>> output =
-                input.apply(GroupByKey.<String, Integer>create());
-
-        PAssert.that(output)
-                .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
-
-        p.run();
-    }
-
-    @Test
-    public void testCountGloballyBasic() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-        PCollection<String> input = p.apply(Create.of(WORDS));
-
-        PCollection<Long> output =
-                input.apply(Count.<String>globally());
-
-        PAssert.that(output)
-                .containsInAnyOrder(13L);
-        p.run();
-    }
-
-    static class AssertThatHasExpectedContentsForTestGroupByKey
-            implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
-            Void> {
-        @Override
-        public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
-            assertThat(actual, containsInAnyOrder(
-                    KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)),
-                    KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
-                            Integer.MIN_VALUE)),
-                    KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)),
-                    KvMatcher.isKv(is("k3"), containsInAnyOrder(0))));
-            return null;
-        }
-    }
-
-    /**
-     * Matcher for KVs.
-     */
-    public static class KvMatcher<K, V>
-            extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
-        final Matcher<? super K> keyMatcher;
-        final Matcher<? super V> valueMatcher;
-
-        public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
-                                                  Matcher<V> valueMatcher) {
-            return new KvMatcher<>(keyMatcher, valueMatcher);
-        }
-
-        public KvMatcher(Matcher<? super K> keyMatcher,
-                         Matcher<? super V> valueMatcher) {
-            this.keyMatcher = keyMatcher;
-            this.valueMatcher = valueMatcher;
-        }
-
-        @Override
-        public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
-            return keyMatcher.matches(kv.getKey())
-                    && valueMatcher.matches(kv.getValue());
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description
-                    .appendText("a KV(").appendValue(keyMatcher)
-                    .appendText(", ").appendValue(valueMatcher)
-                    .appendText(")");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/4ff42cbc/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
deleted file mode 100644
index c911364..0000000
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/translator/ParDoTest.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation.translator;
-
-import org.apache.beam.runners.jstorm.TestJStormRunner;
-import com.google.common.base.MoreObjects;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.*;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.UsesMapState;
-import org.apache.beam.sdk.testing.UsesStatefulParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.values.*;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link ParDo} with {@link StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class ParDoTest implements Serializable {
-
-    @Test
-    public void testParDo() throws IOException {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        PCollection<String> output = pipeline
-                .apply(Create.of(inputs))
-                .apply(ParDo.of(new TestDoFn()));
-
-        PAssert.that(output)
-                .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testParDoWithSideInputs() throws IOException {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        PCollectionView<Integer> sideInput1 = pipeline
-                .apply("CreateSideInput1", Create.of(11))
-                .apply("ViewSideInput1", View.<Integer>asSingleton());
-        PCollectionView<Integer> sideInputUnread = pipeline
-                .apply("CreateSideInputUnread", Create.of(-3333))
-                .apply("ViewSideInputUnread", View.<Integer>asSingleton());
-
-        PCollectionView<Integer> sideInput2 = pipeline
-                .apply("CreateSideInput2", Create.of(222))
-                .apply("ViewSideInput2", View.<Integer>asSingleton());
-        PCollection<String> output = pipeline
-                .apply(Create.of(inputs))
-                .apply(ParDo.of(new TestDoFn(
-                                Arrays.asList(sideInput1, sideInput2),
-                                Arrays.<TupleTag<String>>asList()))
-                        .withSideInputs(sideInput1, sideInputUnread, sideInput2));
-
-        PAssert.that(output)
-                .satisfies(ParDoTest.HasExpectedOutput
-                        .forInput(inputs)
-                        .andSideInputs(11, 222));
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testParDoWithTaggedOutput() {
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
-        TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
-        TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
-        TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
-        TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
-
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        PCollectionTuple outputs = pipeline
-            .apply(Create.of(inputs))
-            .apply(ParDo
-                .of(new TestDoFn(
-                    Arrays.<PCollectionView<Integer>>asList(),
-                    Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
-                .withOutputTags(
-                    mainOutputTag,
-                    TupleTagList.of(additionalOutputTag3)
-                        .and(additionalOutputTag1)
-                        .and(additionalOutputTagUnwritten)
-                        .and(additionalOutputTag2)));
-
-        PAssert.that(outputs.get(mainOutputTag))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
-        PAssert.that(outputs.get(additionalOutputTag1))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag1));
-        PAssert.that(outputs.get(additionalOutputTag2))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag2));
-        PAssert.that(outputs.get(additionalOutputTag3))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag3));
-        PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testNoWindowFnDoesNotReassignWindows() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final PCollection<Long> initialWindows =
-                pipeline
-                    .apply(GenerateSequence.from(0).to(10))
-                    .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
-
-        // Sanity check the window assignment to demonstrate the baseline
-        PAssert.that(initialWindows)
-                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
-                .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
-        PAssert.that(initialWindows)
-                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
-                .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
-
-        PCollection<Boolean> upOne =
-                initialWindows.apply(
-                        "ModifyTypes",
-                        MapElements.<Long, Boolean>via(
-                                new SimpleFunction<Long, Boolean>() {
-                                    @Override
-                                    public Boolean apply(Long input) {
-                                        return input % 2 == 0;
-                                    }
-                                }));
-        PAssert.that(upOne)
-                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
-                .containsInAnyOrder(true, true, true, true, true);
-        PAssert.that(upOne)
-                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
-                .containsInAnyOrder(false, false, false, false, false);
-
-        // The elements should be in the same windows, even though they would not be assigned to the
-        // same windows with the updated timestamps. If we try to apply the original WindowFn, the type
-        // will not be appropriate and the runner should crash, as a Boolean cannot be converted into
-        // a long.
-        PCollection<Boolean> updatedTrigger =
-                upOne.apply(
-                        "UpdateWindowingStrategy",
-                        Window.<Boolean>configure().triggering(Never.ever())
-                                .withAllowedLateness(Duration.ZERO)
-                                .accumulatingFiredPanes());
-        pipeline.run();
-    }
-
-    @Test
-    public void testValueStateSameId() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-
-        DoFn<KV<String, Integer>, KV<String, Integer>> fn =
-                new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-                        c.output(KV.of("sizzle", currentValue));
-                        state.write(currentValue + 1);
-                    }
-                };
-
-        DoFn<KV<String, Integer>, Integer> fn2 =
-                new DoFn<KV<String, Integer>, Integer>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
-                        c.output(currentValue);
-                        state.write(currentValue + 13);
-                    }
-                };
-
-        PCollection<KV<String, Integer>> intermediate =
-                pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
-                        .apply("First stateful ParDo", ParDo.of(fn));
-
-        PCollection<Integer> output =
-                intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
-
-        PAssert.that(intermediate)
-                .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2));
-        PAssert.that(output).containsInAnyOrder(13, 26, 39);
-        pipeline.run();
-    }
-
-    @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
-    public void testValueStateTaggedOutput() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-
-        final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
-        final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
-
-        DoFn<KV<String, Integer>, Integer> fn =
-                new DoFn<KV<String, Integer>, Integer>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-                        if (currentValue % 2 == 0) {
-                            c.output(currentValue);
-                        } else {
-                            c.output(oddTag, currentValue);
-                        }
-                        state.write(currentValue + 1);
-                    }
-                };
-
-        PCollectionTuple output =
-                pipeline.apply(
-                        Create.of(
-                                KV.of("hello", 42),
-                                KV.of("hello", 97),
-                                KV.of("hello", 84),
-                                KV.of("goodbye", 33),
-                                KV.of("hello", 859),
-                                KV.of("goodbye", 83945)))
-                        .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
-
-        PCollection<Integer> evens = output.get(evenTag);
-        PCollection<Integer> odds = output.get(oddTag);
-
-        // There are 0 and 2 from "hello" and just 0 from "goodbye"
-        PAssert.that(evens).containsInAnyOrder(0, 2, 0);
-
-        // There are 1 and 3 from "hello" and just "1" from "goodbye"
-        PAssert.that(odds).containsInAnyOrder(1, 3, 1);
-        pipeline.run();
-    }
-
-    @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
-    public void testMapStateCoderInference() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-        final String countStateId = "count";
-        Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-        pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
-
-        DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
-                new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
-
-                    @StateId(countStateId)
-                    private final StateSpec<CombiningState<Integer, int[], Integer>>
-                            countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
-                            Sum.ofIntegers());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-                            @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                                    count) {
-                        KV<String, Integer> value = c.element().getValue();
-                        state.put(value.getKey(), new MyInteger(value.getValue()));
-                        count.add(1);
-                        if (count.read() >= 4) {
-                            Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
-                            for (Map.Entry<String, MyInteger> entry : iterate) {
-                                c.output(KV.of(entry.getKey(), entry.getValue()));
-                            }
-                        }
-                    }
-                };
-
-        PCollection<KV<String, MyInteger>> output =
-                pipeline.apply(
-                        Create.of(
-                                KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)),
-                                KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12))))
-                        .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder));
-
-        PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)),
-                KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12)));
-        pipeline.run();
-    }
-
-
-    private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
-        private static final IntervalWindow EVEN_WINDOW =
-                new IntervalWindow(
-                        BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
-        private static final IntervalWindow ODD_WINDOW =
-                new IntervalWindow(
-                        BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1));
-
-        @Override
-        public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
-            if (c.element() % 2 == 0) {
-                return Collections.singleton(EVEN_WINDOW);
-            }
-            return Collections.singleton(ODD_WINDOW);
-        }
-
-        @Override
-        public boolean isCompatible(WindowFn<?, ?> other) {
-            return other instanceof WindowOddEvenBuckets;
-        }
-
-        @Override
-        public Coder<IntervalWindow> windowCoder() {
-            return new IntervalWindow.IntervalWindowCoder();
-        }
-
-        @Override
-        public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
-            throw new UnsupportedOperationException(
-                    String.format("Can't use %s for side inputs", getClass().getSimpleName()));
-        }
-    }
-
-
-    static class TestDoFn extends DoFn<Integer, String> {
-        enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED}
-
-        State state = State.NOT_SET_UP;
-
-        final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
-        final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
-
-        public TestDoFn() {
-        }
-
-        public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
-                        List<TupleTag<String>> additionalOutputTupleTags) {
-            this.sideInputViews.addAll(sideInputViews);
-            this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
-        }
-
-        @Setup
-        public void prepare() {
-            assertEquals(State.NOT_SET_UP, state);
-            state = State.UNSTARTED;
-        }
-
-        @StartBundle
-        public void startBundle() {
-            assertThat(state,
-                anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
-
-            state = State.STARTED;
-        }
-
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-            System.out.println("Recv elem: " + c.element());
-            assertThat(state,
-                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-            state = State.PROCESSING;
-            outputToAllWithSideInputs(c, "processing: " + c.element());
-        }
-
-        @FinishBundle
-        public void finishBundle(FinishBundleContext c) {
-            assertThat(state,
-                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-            state = State.FINISHED;
-            c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
-            for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
-                c.output(
-                    additionalOutputTupleTag,
-                    additionalOutputTupleTag.getId() + ": " + "finished",
-                    BoundedWindow.TIMESTAMP_MIN_VALUE,
-                    GlobalWindow.INSTANCE);
-            }
-        }
-
-        private void outputToAllWithSideInputs(ProcessContext c, String value) {
-            if (!sideInputViews.isEmpty()) {
-                List<Integer> sideInputValues = new ArrayList<>();
-                for (PCollectionView<Integer> sideInputView : sideInputViews) {
-                    sideInputValues.add(c.sideInput(sideInputView));
-                }
-                value += ": " + sideInputValues;
-            }
-            c.output(value);
-            for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
-                c.output(additionalOutputTupleTag,
-                    additionalOutputTupleTag.getId() + ": " + value);
-            }
-        }
-    }
-
-    private static class MyInteger implements Comparable<MyInteger> {
-        private final int value;
-
-        MyInteger(int value) {
-            this.value = value;
-        }
-
-        public int getValue() {
-            return value;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-
-            if (!(o instanceof MyInteger)) {
-                return false;
-            }
-
-            MyInteger myInteger = (MyInteger) o;
-
-            return value == myInteger.value;
-
-        }
-
-        @Override
-        public int hashCode() {
-            return value;
-        }
-
-        @Override
-        public int compareTo(MyInteger o) {
-            return Integer.compare(this.getValue(), o.getValue());
-        }
-
-        @Override
-        public String toString() {
-            return "MyInteger{" + "value=" + value + '}';
-        }
-    }
-
-    private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
-        private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
-
-        private final VarIntCoder delegate = VarIntCoder.of();
-
-        public static MyIntegerCoder of() {
-            return INSTANCE;
-        }
-
-        @Override
-        public void encode(MyInteger value, OutputStream outStream)
-                throws CoderException, IOException {
-            delegate.encode(value.getValue(), outStream);
-        }
-
-        @Override
-        public MyInteger decode(InputStream inStream) throws CoderException,
-                IOException {
-            return new MyInteger(delegate.decode(inStream));
-        }
-    }
-
-    /** PAssert "matcher" for expected output. */
-    static class HasExpectedOutput
-        implements SerializableFunction<Iterable<String>, Void>, Serializable {
-        private final List<Integer> inputs;
-        private final List<Integer> sideInputs;
-        private final String additionalOutput;
-        private final boolean ordered;
-
-        public static HasExpectedOutput forInput(List<Integer> inputs) {
-            return new HasExpectedOutput(
-                new ArrayList<Integer>(inputs),
-                new ArrayList<Integer>(),
-                "",
-                false);
-        }
-
-        private HasExpectedOutput(List<Integer> inputs,
-                                  List<Integer> sideInputs,
-                                  String additionalOutput,
-                                  boolean ordered) {
-            this.inputs = inputs;
-            this.sideInputs = sideInputs;
-            this.additionalOutput = additionalOutput;
-            this.ordered = ordered;
-        }
-
-        public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
-            List<Integer> sideInputs = new ArrayList<>();
-            for (Integer sideInputValue : sideInputValues) {
-                sideInputs.add(sideInputValue);
-            }
-            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered);
-        }
-
-        public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
-            return fromOutput(outputTag.getId());
-        }
-        public HasExpectedOutput fromOutput(String outputId) {
-            return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
-        }
-
-        public HasExpectedOutput inOrder() {
-            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
-        }
-
-        @Override
-        public Void apply(Iterable<String> outputs) {
-            List<String> processeds = new ArrayList<>();
-            List<String> finisheds = new ArrayList<>();
-            for (String output : outputs) {
-                if (output.contains("finished")) {
-                    finisheds.add(output);
-                } else {
-                    processeds.add(output);
-                }
-            }
-
-            String sideInputsSuffix;
-            if (sideInputs.isEmpty()) {
-                sideInputsSuffix = "";
-            } else {
-                sideInputsSuffix = ": " + sideInputs;
-            }
-
-            String additionalOutputPrefix;
-            if (additionalOutput.isEmpty()) {
-                additionalOutputPrefix = "";
-            } else {
-                additionalOutputPrefix = additionalOutput + ": ";
-            }
-
-            List<String> expectedProcesseds = new ArrayList<>();
-            for (Integer input : inputs) {
-                expectedProcesseds.add(
-                    additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
-            }
-            String[] expectedProcessedsArray =
-                expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
-            if (!ordered || expectedProcesseds.isEmpty()) {
-                assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
-            } else {
-                assertThat(processeds, contains(expectedProcessedsArray));
-            }
-
-            for (String finished : finisheds) {
-                assertEquals(additionalOutputPrefix + "finished", finished);
-            }
-
-            return null;
-        }
-    }
-}