You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:02 UTC

[02/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
new file mode 100644
index 0000000..481b7fb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultStepContext.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Default StepContext for running DoFn This does not allow accessing state or timer internals.
+ */
+public class DefaultStepContext implements ExecutionContext.StepContext {
+
+    private TimerInternals timerInternals;
+
+    private StateInternals stateInternals;
+
+    public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+        this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+        this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+    }
+
+    @Override
+    public String getStepName() {
+        return null;
+    }
+
+    @Override
+    public String getTransformName() {
+        return null;
+    }
+
+    @Override
+    public void noteOutput(WindowedValue<?> windowedValue) {
+
+    }
+
+    @Override
+    public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+    }
+
+    @Override
+    public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+            Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
+        throw new UnsupportedOperationException("Writing side-input data is not supported.");
+    }
+
+    @Override
+    public StateInternals stateInternals() {
+        return stateInternals;
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+        return timerInternals;
+    }
+
+    public void setStateInternals(StateInternals stateInternals) {
+        this.stateInternals = stateInternals;
+    }
+
+    public void setTimerInternals(TimerInternals timerInternals) {
+        this.timerInternals = timerInternals;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
new file mode 100644
index 0000000..cbf815a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.util;
+
+import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+
+public class RunnerUtils {
+    /**
+     * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
+     * @param elem
+     * @return
+     */
+    public static <K, V> KeyedWorkItem<K, V> toKeyedWorkItem(WindowedValue<KV<K, V>> elem) {
+        WindowedValue<KV<K, V>> kvElem = (WindowedValue<KV<K, V>>) elem;
+        SingletonKeyedWorkItem<K, V> workItem = SingletonKeyedWorkItem.of(
+                kvElem.getValue().getKey(),
+                kvElem.withValue(kvElem.getValue().getValue()));
+        return workItem;
+    }
+
+    public static boolean isGroupByKeyExecutor (Executor executor) {
+        if (executor instanceof GroupByWindowExecutor) {
+            return true;
+        } else if (executor instanceof StatefulDoFnExecutor ||
+                executor instanceof MultiStatefulDoFnExecutor) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
new file mode 100644
index 0000000..391699b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.jstorm.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
+ */
+public class SerializedPipelineOptions implements Serializable {
+
+    private final byte[] serializedOptions;
+
+    /** Lazily initialized copy of deserialized options */
+    private transient PipelineOptions pipelineOptions;
+
+    public SerializedPipelineOptions(PipelineOptions options) {
+        checkNotNull(options, "PipelineOptions must not be null.");
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            new ObjectMapper().writeValue(baos, options);
+            this.serializedOptions = baos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+        }
+
+    }
+
+    public PipelineOptions getPipelineOptions() {
+        if (pipelineOptions == null) {
+            try {
+                pipelineOptions = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+            } catch (IOException e) {
+                throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+            }
+        }
+
+        return pipelineOptions;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
new file mode 100644
index 0000000..dee5f1a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.util;
+
+import java.util.Collections;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * Singleton keyed word item.
+ * @param <K>
+ * @param <ElemT>
+ */
+public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT> {
+
+  final K key;
+  final WindowedValue<ElemT> value;
+
+  private SingletonKeyedWorkItem(K key, WindowedValue<ElemT> value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public static <K, ElemT> SingletonKeyedWorkItem<K, ElemT> of(K key, WindowedValue<ElemT> value) {
+      return new SingletonKeyedWorkItem<K, ElemT>(key, value);
+  }
+
+  @Override
+  public K key() {
+    return key;
+  }
+
+  public WindowedValue<ElemT> value() {
+    return value;
+  }
+
+  @Override
+  public Iterable<TimerInternals.TimerData> timersIterable() {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Iterable<WindowedValue<ElemT>> elementsIterable() {
+    return Collections.singletonList(value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
deleted file mode 100644
index 0ecffff..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/runtime/state/JStormStateInternalsTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.runtime.state;
-
-import avro.shaded.com.google.common.collect.Maps;
-import com.alibaba.jstorm.beam.translation.runtime.TimerServiceImpl;
-import com.alibaba.jstorm.cache.IKvStoreManager;
-import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
-import com.alibaba.jstorm.utils.KryoSerializer;
-
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.hasEntry;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link JStormStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class JStormStateInternalsTest {
-
-    @Rule
-    public final TemporaryFolder tmp = new TemporaryFolder();
-
-    private JStormStateInternals<String> jstormStateInternals;
-
-    @Before
-    public void setup() throws Exception {
-        IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
-                Maps.newHashMap(),
-                "test",
-                tmp.toString(),
-                new KryoSerializer(Maps.newHashMap()));
-        jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0);
-    }
-
-    @Test
-    public void testValueState() throws Exception {
-        ValueState<Integer> valueState = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-        valueState.write(Integer.MIN_VALUE);
-        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-        valueState.write(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-    }
-
-    @Test
-    public void testValueStateIdenticalId() throws Exception {
-        ValueState<Integer> valueState = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-        ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
-
-        valueState.write(Integer.MIN_VALUE);
-        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
-        assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
-        valueState.write(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
-        assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
-    }
-
-    @Test
-    public void testBagState() throws Exception {
-        BagState<Integer> bagStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-        BagState<Integer> bagStateB = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
-
-        bagStateA.add(1);
-        bagStateA.add(0);
-        bagStateA.add(Integer.MAX_VALUE);
-
-        bagStateB.add(0);
-        bagStateB.add(Integer.MIN_VALUE);
-
-        Iterable<Integer> bagA = bagStateA.read();
-        Iterable<Integer> bagB = bagStateB.read();
-        assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
-        assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
-
-        bagStateA.clear();
-        bagStateA.add(1);
-        bagStateB.add(0);
-        assertThat(bagStateA.read(), containsInAnyOrder(1));
-        assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
-    }
-
-    @Test
-    public void testCombiningState() throws Exception {
-        Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
-        Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
-            CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
-
-        CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
-                StateNamespaces.global(),
-                StateTags.combiningValue(
-                        "state-id-a",
-                        accumCoder,
-                        combineFn));
-        assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
-        combiningState.add(10);
-        assertEquals(10, combiningState.read().longValue());
-        combiningState.add(1);
-        assertEquals(10, combiningState.read().longValue());
-        combiningState.add(Integer.MAX_VALUE);
-        assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
-    }
-
-    @Test
-    public void testWatermarkHoldState() throws Exception {
-        WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
-                StateNamespaces.global(),
-                StateTags.watermarkStateInternal(
-                        "state-id-a",
-                        TimestampCombiner.EARLIEST));
-        watermarkHoldState.add(new Instant(1));
-        assertEquals(1, watermarkHoldState.read().getMillis());
-        watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
-        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-        watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
-        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
-    }
-
-    @Test
-    public void testMapState() throws Exception {
-        MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
-        mapStateA.put(1, 1);
-        mapStateA.put(2, 22);
-        mapStateA.put(1, 12);
-
-        Iterable<Integer> keys = mapStateA.keys().read();
-        Iterable<Integer> values = mapStateA.values().read();
-        assertThat(keys, containsInAnyOrder(1, 2));
-        assertThat(values, containsInAnyOrder(12, 22));
-
-        Iterable<Map.Entry<Integer, Integer>> entries =  mapStateA.entries().read();
-        Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
-        Map.Entry<Integer, Integer> entry = itr.next();
-        assertEquals((long) entry.getKey(), 1l);
-        assertEquals((long) entry.getValue(), 12l);
-        entry = itr.next();
-        assertEquals((long) entry.getKey(), 2l);
-        assertEquals((long) entry.getValue(), 22l);
-        assertEquals(false, itr.hasNext());
-
-        mapStateA.remove(1);
-        keys = mapStateA.keys().read();
-        values = mapStateA.values().read();
-        assertThat(keys, containsInAnyOrder(2));
-        assertThat(values, containsInAnyOrder(22));
-
-        entries =  mapStateA.entries().read();
-        itr = entries.iterator();
-        entry = itr.next();
-        assertEquals((long) entry.getKey(), 2l);
-        assertEquals((long) entry.getValue(), 22l);
-        assertEquals(false, itr.hasNext());
-    }
-
-    @Test
-    public void testMassiveDataOfBagState() {
-        BagState<Integer> bagStateA = jstormStateInternals.state(
-                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
-
-        int count = 10000;
-        int n = 1;
-        while(n <= count) {
-            bagStateA.add(n);
-            n++;
-        }
-
-        int readCount = 0;
-        int readN = 0;
-        Iterator<Integer> itr = bagStateA.read().iterator();
-        while(itr.hasNext()) {
-            readN += itr.next();
-            readCount++;
-        }
-
-        assertEquals((long) readN, ((1 + count) * count) / 2);
-        assertEquals((long) readCount, count);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
deleted file mode 100644
index 4f69c93..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/CoGroupByKeyTest.java
+++ /dev/null
@@ -1,302 +0,0 @@
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-@RunWith(JUnit4.class)
-public class CoGroupByKeyTest implements Serializable {
-    /**
-     * Converts the given list into a PCollection belonging to the provided
-     * Pipeline in such a way that coder inference needs to be performed.
-     */
-    private PCollection<KV<Integer, String>> createInput(String name,
-                                                         Pipeline p, List<KV<Integer, String>> list) {
-        return createInput(name, p, list,  new ArrayList<Long>());
-    }
-
-    /**
-     * Converts the given list with timestamps into a PCollection.
-     */
-    private PCollection<KV<Integer, String>> createInput(String name,
-                                                         Pipeline p, List<KV<Integer, String>> list, List<Long> timestamps) {
-        PCollection<KV<Integer, String>> input;
-        if (timestamps.isEmpty()) {
-            input = p.apply("Create" + name, Create.of(list)
-                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
-        } else {
-            input = p.apply("Create" + name, Create.timestamped(list, timestamps)
-                    .withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
-        }
-        return input.apply(
-                "Identity" + name,
-                ParDo.of(
-                        new DoFn<KV<Integer, String>, KV<Integer, String>>() {
-                            @ProcessElement
-                            public void processElement(ProcessContext c) {
-                                c.output(c.element());
-                            }
-                        }));
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the result
-     * of a {@link CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
-     * where each {@link PCollection} has no duplicate keys and the key sets of
-     * each {@link PCollection} are intersecting but neither is a subset of the other.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildGetOnlyGbk(
-            Pipeline p,
-            TupleTag<String> tag1,
-            TupleTag<String> tag2) {
-        List<KV<Integer, String>> list1 =
-                Arrays.asList(
-                        KV.of(1, "collection1-1"),
-                        KV.of(2, "collection1-2"));
-        List<KV<Integer, String>> list2 =
-                Arrays.asList(
-                        KV.of(2, "collection2-2"),
-                        KV.of(3, "collection2-3"));
-        PCollection<KV<Integer, String>> collection1 = createInput("CreateList1", p, list1);
-        PCollection<KV<Integer, String>> collection2 = createInput("CreateList2", p, list2);
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(tag1, collection1)
-                        .and(tag2, collection2)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testCoGroupByKeyGetOnly() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        final TupleTag<String> tag1 = new TupleTag<>();
-        final TupleTag<String> tag2 = new TupleTag<>();
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                buildGetOnlyGbk(p, tag1, tag2);
-
-        PAssert.thatMap(coGbkResults).satisfies(
-                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
-                    @Override
-                    public Void apply(Map<Integer, CoGbkResult> results) {
-                        assertEquals("collection1-1", results.get(1).getOnly(tag1));
-                        assertEquals("collection1-2", results.get(2).getOnly(tag1));
-                        assertEquals("collection2-2", results.get(2).getOnly(tag2));
-                        assertEquals("collection2-3", results.get(3).getOnly(tag2));
-                        return null;
-                    }
-                });
-
-        p.run();
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
-     * results of the {@code CoGroupByKey} over three
-     * {@code PCollection<KV<Integer, String>>}, each of which correlates
-     * a customer id to purchases, addresses, or names, respectively.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbk(
-            Pipeline p,
-            TupleTag<String> purchasesTag,
-            TupleTag<String> addressesTag,
-            TupleTag<String> namesTag) {
-        List<KV<Integer, String>> idToPurchases =
-                Arrays.asList(
-                        KV.of(2, "Boat"),
-                        KV.of(1, "Shoes"),
-                        KV.of(3, "Car"),
-                        KV.of(1, "Book"),
-                        KV.of(10, "Pens"),
-                        KV.of(8, "House"),
-                        KV.of(4, "Suit"),
-                        KV.of(11, "House"),
-                        KV.of(14, "Shoes"),
-                        KV.of(2, "Suit"),
-                        KV.of(8, "Suit Case"),
-                        KV.of(3, "House"));
-
-        List<KV<Integer, String>> idToAddress =
-                Arrays.asList(
-                        KV.of(2, "53 S. 3rd"),
-                        KV.of(10, "383 Jackson Street"),
-                        KV.of(20, "3 W. Arizona"),
-                        KV.of(3, "29 School Rd"),
-                        KV.of(8, "6 Watling Rd"));
-
-        List<KV<Integer, String>> idToName =
-                Arrays.asList(
-                        KV.of(1, "John Smith"),
-                        KV.of(2, "Sally James"),
-                        KV.of(8, "Jeffery Spalding"),
-                        KV.of(20, "Joan Lichtfield"));
-
-        PCollection<KV<Integer, String>> purchasesTable =
-                createInput("CreateIdToPurchases", p, idToPurchases);
-
-        PCollection<KV<Integer, String>> addressTable =
-                createInput("CreateIdToAddress", p, idToAddress);
-
-        PCollection<KV<Integer, String>> nameTable =
-                createInput("CreateIdToName", p, idToName);
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(namesTag, nameTable)
-                        .and(addressesTag, addressTable)
-                        .and(purchasesTag, purchasesTable)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    /**
-     * Returns a {@code PCollection<KV<Integer, CoGbkResult>>} containing the
-     * results of the {@code CoGroupByKey} over 2 {@code PCollection<KV<Integer, String>>},
-     * each of which correlates a customer id to clicks, purchases, respectively.
-     */
-    private PCollection<KV<Integer, CoGbkResult>> buildPurchasesCoGbkWithWindowing(
-            Pipeline p,
-            TupleTag<String> clicksTag,
-            TupleTag<String> purchasesTag) {
-        List<KV<Integer, String>> idToClick =
-                Arrays.asList(
-                        KV.of(1, "Click t0"),
-                        KV.of(2, "Click t2"),
-                        KV.of(1, "Click t4"),
-                        KV.of(1, "Click t6"),
-                        KV.of(2, "Click t8"));
-
-        List<KV<Integer, String>> idToPurchases =
-                Arrays.asList(
-                        KV.of(1, "Boat t1"),
-                        KV.of(1, "Shoesi t2"),
-                        KV.of(1, "Pens t3"),
-                        KV.of(2, "House t4"),
-                        KV.of(2, "Suit t5"),
-                        KV.of(1, "Car t6"),
-                        KV.of(1, "Book t7"),
-                        KV.of(2, "House t8"),
-                        KV.of(2, "Shoes t9"),
-                        KV.of(2, "House t10"));
-
-        PCollection<KV<Integer, String>> clicksTable =
-                createInput("CreateClicks",
-                        p,
-                        idToClick,
-                        Arrays.asList(0L, 2L, 4L, 6L, 8L))
-                        .apply("WindowClicks", Window.<KV<Integer, String>>into(
-                                FixedWindows.of(new Duration(4)))
-                                .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
-        PCollection<KV<Integer, String>> purchasesTable =
-                createInput("CreatePurchases",
-                        p,
-                        idToPurchases,
-                        Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
-                        .apply("WindowPurchases", Window.<KV<Integer, String>>into(
-                                FixedWindows.of(new Duration(4)))
-                                .withTimestampCombiner(TimestampCombiner.EARLIEST));
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                KeyedPCollectionTuple.of(clicksTag, clicksTable)
-                        .and(purchasesTag, purchasesTable)
-                        .apply(CoGroupByKey.<Integer>create());
-        return coGbkResults;
-    }
-
-    @Test
-    @Category(ValidatesRunner.class)
-    public void testCoGroupByKey() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        final TupleTag<String> namesTag = new TupleTag<>();
-        final TupleTag<String> addressesTag = new TupleTag<>();
-        final TupleTag<String> purchasesTag = new TupleTag<>();
-
-
-        PCollection<KV<Integer, CoGbkResult>> coGbkResults =
-                buildPurchasesCoGbk(p, purchasesTag, addressesTag, namesTag);
-
-        PAssert.thatMap(coGbkResults).satisfies(
-                new SerializableFunction<Map<Integer, CoGbkResult>, Void>() {
-                    @Override
-                    public Void apply(Map<Integer, CoGbkResult> results) {
-                        CoGbkResult result1 = results.get(1);
-                        assertEquals("John Smith", result1.getOnly(namesTag));
-                        assertThat(result1.getAll(purchasesTag), containsInAnyOrder("Shoes", "Book"));
-
-                        CoGbkResult result2 = results.get(2);
-                        assertEquals("Sally James", result2.getOnly(namesTag));
-                        assertEquals("53 S. 3rd", result2.getOnly(addressesTag));
-                        assertThat(result2.getAll(purchasesTag), containsInAnyOrder("Suit", "Boat"));
-
-                        CoGbkResult result3 = results.get(3);
-                        assertEquals("29 School Rd", result3.getOnly(addressesTag), "29 School Rd");
-                        assertThat(result3.getAll(purchasesTag), containsInAnyOrder("Car", "House"));
-
-                        CoGbkResult result8 = results.get(8);
-                        assertEquals("Jeffery Spalding", result8.getOnly(namesTag));
-                        assertEquals("6 Watling Rd", result8.getOnly(addressesTag));
-                        assertThat(result8.getAll(purchasesTag), containsInAnyOrder("House", "Suit Case"));
-
-                        CoGbkResult result20 = results.get(20);
-                        assertEquals("Joan Lichtfield", result20.getOnly(namesTag));
-                        assertEquals("3 W. Arizona", result20.getOnly(addressesTag));
-
-                        assertEquals("383 Jackson Street", results.get(10).getOnly(addressesTag));
-
-                        assertThat(results.get(4).getAll(purchasesTag), containsInAnyOrder("Suit"));
-                        assertThat(results.get(10).getAll(purchasesTag), containsInAnyOrder("Pens"));
-                        assertThat(results.get(11).getAll(purchasesTag), containsInAnyOrder("House"));
-                        assertThat(results.get(14).getAll(purchasesTag), containsInAnyOrder("Shoes"));
-
-                        return null;
-                    }
-                });
-
-        p.run();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
deleted file mode 100644
index 5ec6636..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/GroupByKeyTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.TypeSafeMatcher;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link GroupByKey} with {@link com.alibaba.jstorm.beam.StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class GroupByKeyTest {
-
-    static final String[] WORDS_ARRAY = new String[] {
-            "hi", "there", "hi", "hi", "sue", "bob",
-            "hi", "sue", "", "", "ZOW", "bob", "" };
-
-    static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
-
-    @Test
-    public void testGroupByKey() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-
-        List<KV<String, Integer>> ungroupedPairs = Arrays.asList(
-                KV.of("k1", 3),
-                KV.of("k5", Integer.MAX_VALUE),
-                KV.of("k5", Integer.MIN_VALUE),
-                KV.of("k2", 66),
-                KV.of("k1", 4),
-                KV.of("k2", -33),
-                KV.of("k3", 0));
-
-        PCollection<KV<String, Integer>> input =
-                p.apply(Create.of(ungroupedPairs)
-                        .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
-
-        PCollection<KV<String, Iterable<Integer>>> output =
-                input.apply(GroupByKey.<String, Integer>create());
-
-        PAssert.that(output)
-                .satisfies(new AssertThatHasExpectedContentsForTestGroupByKey());
-
-        p.run();
-    }
-
-    @Test
-    public void testCountGloballyBasic() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        options.setLocalMode(true);
-
-        Pipeline p = Pipeline.create(options);
-        PCollection<String> input = p.apply(Create.of(WORDS));
-
-        PCollection<Long> output =
-                input.apply(Count.<String>globally());
-
-        PAssert.that(output)
-                .containsInAnyOrder(13L);
-        p.run();
-    }
-
-    static class AssertThatHasExpectedContentsForTestGroupByKey
-            implements SerializableFunction<Iterable<KV<String, Iterable<Integer>>>,
-            Void> {
-        @Override
-        public Void apply(Iterable<KV<String, Iterable<Integer>>> actual) {
-            assertThat(actual, containsInAnyOrder(
-                    KvMatcher.isKv(is("k1"), containsInAnyOrder(3, 4)),
-                    KvMatcher.isKv(is("k5"), containsInAnyOrder(Integer.MAX_VALUE,
-                            Integer.MIN_VALUE)),
-                    KvMatcher.isKv(is("k2"), containsInAnyOrder(66, -33)),
-                    KvMatcher.isKv(is("k3"), containsInAnyOrder(0))));
-            return null;
-        }
-    }
-
-    /**
-     * Matcher for KVs.
-     */
-    public static class KvMatcher<K, V>
-            extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
-        final Matcher<? super K> keyMatcher;
-        final Matcher<? super V> valueMatcher;
-
-        public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher,
-                                                  Matcher<V> valueMatcher) {
-            return new KvMatcher<>(keyMatcher, valueMatcher);
-        }
-
-        public KvMatcher(Matcher<? super K> keyMatcher,
-                         Matcher<? super V> valueMatcher) {
-            this.keyMatcher = keyMatcher;
-            this.valueMatcher = valueMatcher;
-        }
-
-        @Override
-        public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
-            return keyMatcher.matches(kv.getKey())
-                    && valueMatcher.matches(kv.getValue());
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description
-                    .appendText("a KV(").appendValue(keyMatcher)
-                    .appendText(", ").appendValue(valueMatcher)
-                    .appendText(")");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java b/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
deleted file mode 100644
index da0aafe..0000000
--- a/runners/jstorm/src/test/java/com/alibaba/jstorm/beam/translation/translator/ParDoTest.java
+++ /dev/null
@@ -1,626 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.beam.translation.translator;
-
-import com.alibaba.jstorm.beam.StormPipelineOptions;
-
-import com.alibaba.jstorm.beam.TestJStormRunner;
-import com.google.common.base.MoreObjects;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.*;
-import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.*;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.UsesMapState;
-import org.apache.beam.sdk.testing.UsesStatefulParDo;
-import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.*;
-import org.apache.beam.sdk.transforms.windowing.*;
-import org.apache.beam.sdk.values.*;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.*;
-
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
-import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests for {@link ParDo} with {@link com.alibaba.jstorm.beam.StormRunner}.
- */
-@RunWith(JUnit4.class)
-public class ParDoTest implements Serializable {
-
-    @Test
-    public void testParDo() throws IOException {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        PCollection<String> output = pipeline
-                .apply(Create.of(inputs))
-                .apply(ParDo.of(new TestDoFn()));
-
-        PAssert.that(output)
-                .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testParDoWithSideInputs() throws IOException {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        PCollectionView<Integer> sideInput1 = pipeline
-                .apply("CreateSideInput1", Create.of(11))
-                .apply("ViewSideInput1", View.<Integer>asSingleton());
-        PCollectionView<Integer> sideInputUnread = pipeline
-                .apply("CreateSideInputUnread", Create.of(-3333))
-                .apply("ViewSideInputUnread", View.<Integer>asSingleton());
-
-        PCollectionView<Integer> sideInput2 = pipeline
-                .apply("CreateSideInput2", Create.of(222))
-                .apply("ViewSideInput2", View.<Integer>asSingleton());
-        PCollection<String> output = pipeline
-                .apply(Create.of(inputs))
-                .apply(ParDo.of(new TestDoFn(
-                                Arrays.asList(sideInput1, sideInput2),
-                                Arrays.<TupleTag<String>>asList()))
-                        .withSideInputs(sideInput1, sideInputUnread, sideInput2));
-
-        PAssert.that(output)
-                .satisfies(ParDoTest.HasExpectedOutput
-                        .forInput(inputs)
-                        .andSideInputs(11, 222));
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testParDoWithTaggedOutput() {
-        List<Integer> inputs = Arrays.asList(3, -42, 666);
-
-        TupleTag<String> mainOutputTag = new TupleTag<String>("main"){};
-        TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){};
-        TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){};
-        TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){};
-        TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){};
-
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        PCollectionTuple outputs = pipeline
-            .apply(Create.of(inputs))
-            .apply(ParDo
-                .of(new TestDoFn(
-                    Arrays.<PCollectionView<Integer>>asList(),
-                    Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3)))
-                .withOutputTags(
-                    mainOutputTag,
-                    TupleTagList.of(additionalOutputTag3)
-                        .and(additionalOutputTag1)
-                        .and(additionalOutputTagUnwritten)
-                        .and(additionalOutputTag2)));
-
-        PAssert.that(outputs.get(mainOutputTag))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
-
-        PAssert.that(outputs.get(additionalOutputTag1))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag1));
-        PAssert.that(outputs.get(additionalOutputTag2))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag2));
-        PAssert.that(outputs.get(additionalOutputTag3))
-            .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)
-                .fromOutput(additionalOutputTag3));
-        PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty();
-
-        pipeline.run();
-    }
-
-    @Test
-    public void testNoWindowFnDoesNotReassignWindows() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final PCollection<Long> initialWindows =
-                pipeline
-                    .apply(GenerateSequence.from(0).to(10))
-                    .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
-
-        // Sanity check the window assignment to demonstrate the baseline
-        PAssert.that(initialWindows)
-                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
-                .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
-        PAssert.that(initialWindows)
-                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
-                .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
-
-        PCollection<Boolean> upOne =
-                initialWindows.apply(
-                        "ModifyTypes",
-                        MapElements.<Long, Boolean>via(
-                                new SimpleFunction<Long, Boolean>() {
-                                    @Override
-                                    public Boolean apply(Long input) {
-                                        return input % 2 == 0;
-                                    }
-                                }));
-        PAssert.that(upOne)
-                .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
-                .containsInAnyOrder(true, true, true, true, true);
-        PAssert.that(upOne)
-                .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
-                .containsInAnyOrder(false, false, false, false, false);
-
-        // The elements should be in the same windows, even though they would not be assigned to the
-        // same windows with the updated timestamps. If we try to apply the original WindowFn, the type
-        // will not be appropriate and the runner should crash, as a Boolean cannot be converted into
-        // a long.
-        PCollection<Boolean> updatedTrigger =
-                upOne.apply(
-                        "UpdateWindowingStrategy",
-                        Window.<Boolean>configure().triggering(Never.ever())
-                                .withAllowedLateness(Duration.ZERO)
-                                .accumulatingFiredPanes());
-        pipeline.run();
-    }
-
-    @Test
-    public void testValueStateSameId() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-
-        DoFn<KV<String, Integer>, KV<String, Integer>> fn =
-                new DoFn<KV<String, Integer>, KV<String, Integer>>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-                        c.output(KV.of("sizzle", currentValue));
-                        state.write(currentValue + 1);
-                    }
-                };
-
-        DoFn<KV<String, Integer>, Integer> fn2 =
-                new DoFn<KV<String, Integer>, Integer>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
-                        c.output(currentValue);
-                        state.write(currentValue + 13);
-                    }
-                };
-
-        PCollection<KV<String, Integer>> intermediate =
-                pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84)))
-                        .apply("First stateful ParDo", ParDo.of(fn));
-
-        PCollection<Integer> output =
-                intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
-
-        PAssert.that(intermediate)
-                .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), KV.of("sizzle", 2));
-        PAssert.that(output).containsInAnyOrder(13, 26, 39);
-        pipeline.run();
-    }
-
-    @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class})
-    public void testValueStateTaggedOutput() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-
-        final TupleTag<Integer> evenTag = new TupleTag<Integer>() {};
-        final TupleTag<Integer> oddTag = new TupleTag<Integer>() {};
-
-        DoFn<KV<String, Integer>, Integer> fn =
-                new DoFn<KV<String, Integer>, Integer>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<ValueState<Integer>> intState =
-                            StateSpecs.value(VarIntCoder.of());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-                        Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-                        if (currentValue % 2 == 0) {
-                            c.output(currentValue);
-                        } else {
-                            c.output(oddTag, currentValue);
-                        }
-                        state.write(currentValue + 1);
-                    }
-                };
-
-        PCollectionTuple output =
-                pipeline.apply(
-                        Create.of(
-                                KV.of("hello", 42),
-                                KV.of("hello", 97),
-                                KV.of("hello", 84),
-                                KV.of("goodbye", 33),
-                                KV.of("hello", 859),
-                                KV.of("goodbye", 83945)))
-                        .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag)));
-
-        PCollection<Integer> evens = output.get(evenTag);
-        PCollection<Integer> odds = output.get(oddTag);
-
-        // There are 0 and 2 from "hello" and just 0 from "goodbye"
-        PAssert.that(evens).containsInAnyOrder(0, 2, 0);
-
-        // There are 1 and 3 from "hello" and just "1" from "goodbye"
-        PAssert.that(odds).containsInAnyOrder(1, 3, 1);
-        pipeline.run();
-    }
-
-    @Test
-    @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesMapState.class})
-    public void testMapStateCoderInference() {
-        StormPipelineOptions options = PipelineOptionsFactory.as(StormPipelineOptions.class);
-        options.setRunner(TestJStormRunner.class);
-        Pipeline pipeline = Pipeline.create(options);
-
-        final String stateId = "foo";
-        final String countStateId = "count";
-        Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
-        pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
-
-        DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>> fn =
-                new DoFn<KV<String, KV<String, Integer>>, KV<String, MyInteger>>() {
-
-                    @StateId(stateId)
-                    private final StateSpec<MapState<String, MyInteger>> mapState = StateSpecs.map();
-
-                    @StateId(countStateId)
-                    private final StateSpec<CombiningState<Integer, int[], Integer>>
-                            countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(),
-                            Sum.ofIntegers());
-
-                    @ProcessElement
-                    public void processElement(
-                            ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state,
-                            @StateId(countStateId) CombiningState<Integer, int[], Integer>
-                                    count) {
-                        KV<String, Integer> value = c.element().getValue();
-                        state.put(value.getKey(), new MyInteger(value.getValue()));
-                        count.add(1);
-                        if (count.read() >= 4) {
-                            Iterable<Map.Entry<String, MyInteger>> iterate = state.entries().read();
-                            for (Map.Entry<String, MyInteger> entry : iterate) {
-                                c.output(KV.of(entry.getKey(), entry.getValue()));
-                            }
-                        }
-                    }
-                };
-
-        PCollection<KV<String, MyInteger>> output =
-                pipeline.apply(
-                        Create.of(
-                                KV.of("hello", KV.of("a", 97)), KV.of("hello", KV.of("b", 42)),
-                                KV.of("hello", KV.of("b", 42)), KV.of("hello", KV.of("c", 12))))
-                        .apply(ParDo.of(fn)).setCoder(KvCoder.of(StringUtf8Coder.of(), myIntegerCoder));
-
-        PAssert.that(output).containsInAnyOrder(KV.of("a", new MyInteger(97)),
-                KV.of("b", new MyInteger(42)), KV.of("c", new MyInteger(12)));
-        pipeline.run();
-    }
-
-
-    private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> {
-        private static final IntervalWindow EVEN_WINDOW =
-                new IntervalWindow(
-                        BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp());
-        private static final IntervalWindow ODD_WINDOW =
-                new IntervalWindow(
-                        BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1));
-
-        @Override
-        public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
-            if (c.element() % 2 == 0) {
-                return Collections.singleton(EVEN_WINDOW);
-            }
-            return Collections.singleton(ODD_WINDOW);
-        }
-
-        @Override
-        public boolean isCompatible(WindowFn<?, ?> other) {
-            return other instanceof WindowOddEvenBuckets;
-        }
-
-        @Override
-        public Coder<IntervalWindow> windowCoder() {
-            return new IntervalWindow.IntervalWindowCoder();
-        }
-
-        @Override
-        public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
-            throw new UnsupportedOperationException(
-                    String.format("Can't use %s for side inputs", getClass().getSimpleName()));
-        }
-    }
-
-
-    static class TestDoFn extends DoFn<Integer, String> {
-        enum State {NOT_SET_UP, UNSTARTED, STARTED, PROCESSING, FINISHED}
-
-        State state = State.NOT_SET_UP;
-
-        final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
-        final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>();
-
-        public TestDoFn() {
-        }
-
-        public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
-                        List<TupleTag<String>> additionalOutputTupleTags) {
-            this.sideInputViews.addAll(sideInputViews);
-            this.additionalOutputTupleTags.addAll(additionalOutputTupleTags);
-        }
-
-        @Setup
-        public void prepare() {
-            assertEquals(State.NOT_SET_UP, state);
-            state = State.UNSTARTED;
-        }
-
-        @StartBundle
-        public void startBundle() {
-            assertThat(state,
-                anyOf(equalTo(State.UNSTARTED), equalTo(State.FINISHED)));
-
-            state = State.STARTED;
-        }
-
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-            System.out.println("Recv elem: " + c.element());
-            assertThat(state,
-                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-            state = State.PROCESSING;
-            outputToAllWithSideInputs(c, "processing: " + c.element());
-        }
-
-        @FinishBundle
-        public void finishBundle(FinishBundleContext c) {
-            assertThat(state,
-                anyOf(equalTo(State.STARTED), equalTo(State.PROCESSING)));
-            state = State.FINISHED;
-            c.output("finished", BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE);
-            for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
-                c.output(
-                    additionalOutputTupleTag,
-                    additionalOutputTupleTag.getId() + ": " + "finished",
-                    BoundedWindow.TIMESTAMP_MIN_VALUE,
-                    GlobalWindow.INSTANCE);
-            }
-        }
-
-        private void outputToAllWithSideInputs(ProcessContext c, String value) {
-            if (!sideInputViews.isEmpty()) {
-                List<Integer> sideInputValues = new ArrayList<>();
-                for (PCollectionView<Integer> sideInputView : sideInputViews) {
-                    sideInputValues.add(c.sideInput(sideInputView));
-                }
-                value += ": " + sideInputValues;
-            }
-            c.output(value);
-            for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) {
-                c.output(additionalOutputTupleTag,
-                    additionalOutputTupleTag.getId() + ": " + value);
-            }
-        }
-    }
-
-    private static class MyInteger implements Comparable<MyInteger> {
-        private final int value;
-
-        MyInteger(int value) {
-            this.value = value;
-        }
-
-        public int getValue() {
-            return value;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-
-            if (!(o instanceof MyInteger)) {
-                return false;
-            }
-
-            MyInteger myInteger = (MyInteger) o;
-
-            return value == myInteger.value;
-
-        }
-
-        @Override
-        public int hashCode() {
-            return value;
-        }
-
-        @Override
-        public int compareTo(MyInteger o) {
-            return Integer.compare(this.getValue(), o.getValue());
-        }
-
-        @Override
-        public String toString() {
-            return "MyInteger{" + "value=" + value + '}';
-        }
-    }
-
-    private static class MyIntegerCoder extends AtomicCoder<MyInteger> {
-        private static final MyIntegerCoder INSTANCE = new MyIntegerCoder();
-
-        private final VarIntCoder delegate = VarIntCoder.of();
-
-        public static MyIntegerCoder of() {
-            return INSTANCE;
-        }
-
-        @Override
-        public void encode(MyInteger value, OutputStream outStream)
-                throws CoderException, IOException {
-            delegate.encode(value.getValue(), outStream);
-        }
-
-        @Override
-        public MyInteger decode(InputStream inStream) throws CoderException,
-                IOException {
-            return new MyInteger(delegate.decode(inStream));
-        }
-    }
-
-    /** PAssert "matcher" for expected output. */
-    static class HasExpectedOutput
-        implements SerializableFunction<Iterable<String>, Void>, Serializable {
-        private final List<Integer> inputs;
-        private final List<Integer> sideInputs;
-        private final String additionalOutput;
-        private final boolean ordered;
-
-        public static HasExpectedOutput forInput(List<Integer> inputs) {
-            return new HasExpectedOutput(
-                new ArrayList<Integer>(inputs),
-                new ArrayList<Integer>(),
-                "",
-                false);
-        }
-
-        private HasExpectedOutput(List<Integer> inputs,
-                                  List<Integer> sideInputs,
-                                  String additionalOutput,
-                                  boolean ordered) {
-            this.inputs = inputs;
-            this.sideInputs = sideInputs;
-            this.additionalOutput = additionalOutput;
-            this.ordered = ordered;
-        }
-
-        public HasExpectedOutput andSideInputs(Integer... sideInputValues) {
-            List<Integer> sideInputs = new ArrayList<>();
-            for (Integer sideInputValue : sideInputValues) {
-                sideInputs.add(sideInputValue);
-            }
-            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered);
-        }
-
-        public HasExpectedOutput fromOutput(TupleTag<String> outputTag) {
-            return fromOutput(outputTag.getId());
-        }
-        public HasExpectedOutput fromOutput(String outputId) {
-            return new HasExpectedOutput(inputs, sideInputs, outputId, ordered);
-        }
-
-        public HasExpectedOutput inOrder() {
-            return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true);
-        }
-
-        @Override
-        public Void apply(Iterable<String> outputs) {
-            List<String> processeds = new ArrayList<>();
-            List<String> finisheds = new ArrayList<>();
-            for (String output : outputs) {
-                if (output.contains("finished")) {
-                    finisheds.add(output);
-                } else {
-                    processeds.add(output);
-                }
-            }
-
-            String sideInputsSuffix;
-            if (sideInputs.isEmpty()) {
-                sideInputsSuffix = "";
-            } else {
-                sideInputsSuffix = ": " + sideInputs;
-            }
-
-            String additionalOutputPrefix;
-            if (additionalOutput.isEmpty()) {
-                additionalOutputPrefix = "";
-            } else {
-                additionalOutputPrefix = additionalOutput + ": ";
-            }
-
-            List<String> expectedProcesseds = new ArrayList<>();
-            for (Integer input : inputs) {
-                expectedProcesseds.add(
-                    additionalOutputPrefix + "processing: " + input + sideInputsSuffix);
-            }
-            String[] expectedProcessedsArray =
-                expectedProcesseds.toArray(new String[expectedProcesseds.size()]);
-            if (!ordered || expectedProcesseds.isEmpty()) {
-                assertThat(processeds, containsInAnyOrder(expectedProcessedsArray));
-            } else {
-                assertThat(processeds, contains(expectedProcessedsArray));
-            }
-
-            for (String finished : finisheds) {
-                assertEquals(additionalOutputPrefix + "finished", finished);
-            }
-
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
new file mode 100644
index 0000000..11c7c94
--- /dev/null
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerServiceImpl;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
+import com.alibaba.jstorm.utils.KryoSerializer;
+
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.state.*;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JStormStateInternals}.
+ */
+@RunWith(JUnit4.class)
+public class JStormStateInternalsTest {
+
+    @Rule
+    public final TemporaryFolder tmp = new TemporaryFolder();
+
+    private JStormStateInternals<String> jstormStateInternals;
+
+    @Before
+    public void setup() throws Exception {
+        IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
+                Maps.newHashMap(),
+                "test",
+                tmp.toString(),
+                new KryoSerializer(Maps.newHashMap()));
+        jstormStateInternals = new JStormStateInternals("key-1", kvStoreManager, new TimerServiceImpl(), 0);
+    }
+
+    @Test
+    public void testValueState() throws Exception {
+        ValueState<Integer> valueState = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+        valueState.write(Integer.MIN_VALUE);
+        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+        valueState.write(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+    }
+
+    @Test
+    public void testValueStateIdenticalId() throws Exception {
+        ValueState<Integer> valueState = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+        ValueState<Integer> valueStateIdentical = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.value("state-id-a", BigEndianIntegerCoder.of()));
+
+        valueState.write(Integer.MIN_VALUE);
+        assertEquals(Integer.MIN_VALUE, valueState.read().longValue());
+        assertEquals(Integer.MIN_VALUE, valueStateIdentical.read().longValue());
+        valueState.write(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE, valueState.read().longValue());
+        assertEquals(Integer.MAX_VALUE, valueStateIdentical.read().longValue());
+    }
+
+    @Test
+    public void testBagState() throws Exception {
+        BagState<Integer> bagStateA = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+        BagState<Integer> bagStateB = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.bag("state-id-b", BigEndianIntegerCoder.of()));
+
+        bagStateA.add(1);
+        bagStateA.add(0);
+        bagStateA.add(Integer.MAX_VALUE);
+
+        bagStateB.add(0);
+        bagStateB.add(Integer.MIN_VALUE);
+
+        Iterable<Integer> bagA = bagStateA.read();
+        Iterable<Integer> bagB = bagStateB.read();
+        assertThat(bagA, containsInAnyOrder(1, 0, Integer.MAX_VALUE));
+        assertThat(bagB, containsInAnyOrder(0, Integer.MIN_VALUE));
+
+        bagStateA.clear();
+        bagStateA.add(1);
+        bagStateB.add(0);
+        assertThat(bagStateA.read(), containsInAnyOrder(1));
+        assertThat(bagStateB.read(), containsInAnyOrder(0, 0, Integer.MIN_VALUE));
+    }
+
+    @Test
+    public void testCombiningState() throws Exception {
+        Combine.CombineFn<Integer, int[], Integer> combineFn = Max.ofIntegers();
+        Coder<int[]> accumCoder = combineFn.getAccumulatorCoder(
+            CoderRegistry.createDefault(), BigEndianIntegerCoder.of());
+
+        CombiningState<Integer, int[], Integer> combiningState = jstormStateInternals.state(
+                StateNamespaces.global(),
+                StateTags.combiningValue(
+                        "state-id-a",
+                        accumCoder,
+                        combineFn));
+        assertEquals(Integer.MIN_VALUE, combiningState.read().longValue());
+        combiningState.add(10);
+        assertEquals(10, combiningState.read().longValue());
+        combiningState.add(1);
+        assertEquals(10, combiningState.read().longValue());
+        combiningState.add(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE, combiningState.read().longValue());
+    }
+
+    @Test
+    public void testWatermarkHoldState() throws Exception {
+        WatermarkHoldState watermarkHoldState = jstormStateInternals.state(
+                StateNamespaces.global(),
+                StateTags.watermarkStateInternal(
+                        "state-id-a",
+                        TimestampCombiner.EARLIEST));
+        watermarkHoldState.add(new Instant(1));
+        assertEquals(1, watermarkHoldState.read().getMillis());
+        watermarkHoldState.add(new Instant(Integer.MIN_VALUE));
+        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+        watermarkHoldState.add(new Instant(Integer.MAX_VALUE));
+        assertEquals(Integer.MIN_VALUE, watermarkHoldState.read().getMillis());
+    }
+
+    @Test
+    public void testMapState() throws Exception {
+        MapState<Integer, Integer> mapStateA = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.map("state-id-a", BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()));
+        mapStateA.put(1, 1);
+        mapStateA.put(2, 22);
+        mapStateA.put(1, 12);
+
+        Iterable<Integer> keys = mapStateA.keys().read();
+        Iterable<Integer> values = mapStateA.values().read();
+        assertThat(keys, containsInAnyOrder(1, 2));
+        assertThat(values, containsInAnyOrder(12, 22));
+
+        Iterable<Map.Entry<Integer, Integer>> entries =  mapStateA.entries().read();
+        Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
+        Map.Entry<Integer, Integer> entry = itr.next();
+        assertEquals((long) entry.getKey(), 1l);
+        assertEquals((long) entry.getValue(), 12l);
+        entry = itr.next();
+        assertEquals((long) entry.getKey(), 2l);
+        assertEquals((long) entry.getValue(), 22l);
+        assertEquals(false, itr.hasNext());
+
+        mapStateA.remove(1);
+        keys = mapStateA.keys().read();
+        values = mapStateA.values().read();
+        assertThat(keys, containsInAnyOrder(2));
+        assertThat(values, containsInAnyOrder(22));
+
+        entries =  mapStateA.entries().read();
+        itr = entries.iterator();
+        entry = itr.next();
+        assertEquals((long) entry.getKey(), 2l);
+        assertEquals((long) entry.getValue(), 22l);
+        assertEquals(false, itr.hasNext());
+    }
+
+    @Test
+    public void testMassiveDataOfBagState() {
+        BagState<Integer> bagStateA = jstormStateInternals.state(
+                StateNamespaces.global(), StateTags.bag("state-id-a", BigEndianIntegerCoder.of()));
+
+        int count = 10000;
+        int n = 1;
+        while(n <= count) {
+            bagStateA.add(n);
+            n++;
+        }
+
+        int readCount = 0;
+        int readN = 0;
+        Iterator<Integer> itr = bagStateA.read().iterator();
+        while(itr.hasNext()) {
+            readN += itr.next();
+            readCount++;
+        }
+
+        assertEquals((long) readN, ((1 + count) * count) / 2);
+        assertEquals((long) readCount, count);
+    }
+}