You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:03 UTC

[03/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
new file mode 100644
index 0000000..f101beb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.IKvStore;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class JStormMapState<K, V> implements MapState<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+    private final K key;
+    private final StateNamespace namespace;
+    private IKvStore<K, V> kvStore;
+
+    public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+        this.key = key;
+        this.namespace = namespace;
+        this.kvStore = kvStore;
+    }
+
+    @Override
+    public void put(K var1, V var2) {
+        try {
+            kvStore.put(var1, var2);
+        } catch (IOException e) {
+            reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+        }
+    }
+
+    @Override
+    public ReadableState<V> putIfAbsent(K var1, V var2) {
+        ReadableState<V> ret = null;
+        try {
+            V value = kvStore.get(var1);
+            if (value == null) {
+                kvStore.put(var1, var2);
+                ret = new MapReadableState<>(null);
+            } else {
+                ret = new MapReadableState<>(value);
+            }
+        } catch (IOException e) {
+            reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+        }
+        return ret;
+    }
+
+    @Override
+    public void remove(K var1) {
+        try {
+            kvStore.remove(var1);
+        } catch (IOException e) {
+            reportError(String.format("Failed to remove key=%s", var1), e);
+        }
+    }
+
+    @Override
+    public ReadableState<V> get(K var1) {
+        ReadableState<V> ret = new MapReadableState<>(null);
+        try {
+            ret = new MapReadableState(kvStore.get(var1));
+        } catch (IOException e) {
+            reportError(String.format("Failed to get value for key=%s", var1), e);
+        }
+        return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<K>> keys() {
+        ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+        try {
+            ret = new MapReadableState<>(kvStore.keys());
+        } catch (IOException e) {
+            reportError(String.format("Failed to get keys"), e);
+        }
+        return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<V>> values() {
+        ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+        try {
+            ret = new MapReadableState<>(kvStore.values());
+        } catch (IOException e) {
+            reportError(String.format("Failed to get values"), e);
+        }
+        return ret;
+    }
+
+    @Override
+    public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+        ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+        try {
+            ret = new MapReadableState<>(kvStore.entries());
+        } catch (IOException e) {
+            reportError(String.format("Failed to get values"), e);
+        }
+        return ret;
+    }
+
+    @Override
+    public void clear() {
+        try {
+            Iterable<K> keys = kvStore.keys();
+            kvStore.removeBatch(keys);
+        } catch (IOException e) {
+            reportError(String.format("Failed to clear map state"), e);
+        }
+    }
+
+    private void reportError(String errorInfo, IOException e) {
+        LOG.error(errorInfo, e);
+        throw new RuntimeException(errorInfo);
+    }
+
+    private class MapReadableState<T> implements ReadableState<T> {
+        private T value;
+
+        public MapReadableState(T value) {
+            this.value = value;
+        }
+
+        @Override
+        public T read() {
+            return value;
+        }
+
+        @Override
+        public ReadableState<T> readLater() {
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
new file mode 100644
index 0000000..8a0cb73
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternals.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateBinder;
+import org.apache.beam.sdk.state.StateContext;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link StateInternals}.
+ */
+public class JStormStateInternals<K> implements StateInternals {
+
+    private static final String STATE_INFO = "state-info:";
+
+    @Nullable
+    private final K key;
+    private final IKvStoreManager kvStoreManager;
+    private final TimerService timerService;
+    private final int executorId;
+
+    public JStormStateInternals(K key, IKvStoreManager kvStoreManager,
+                                TimerService timerService, int executorId) {
+        this.key = key;
+        this.kvStoreManager = checkNotNull(kvStoreManager, "kvStoreManager");
+        this.timerService = checkNotNull(timerService, "timerService");
+        this.executorId = executorId;
+    }
+
+    @Nullable
+    @Override
+    public K getKey() {
+        return key;
+    }
+
+    @Override
+    public <T extends State> T state(
+        StateNamespace namespace, StateTag<T> address, StateContext<?> c) {
+        // throw new UnsupportedOperationException("StateContext is not supported.");
+        /**
+         * TODOļ¼š
+         * Same implementation as state() which is without StateContext. This might be updated after
+         * we figure out if we really need StateContext for JStorm state internals.
+         */
+        return state(namespace, address);
+    }
+
+    @Override
+    public <T extends State> T state(final StateNamespace namespace, StateTag<T> address) {
+        return address.getSpec().bind(address.getId(), new StateBinder() {
+            @Override
+            public <T> ValueState<T> bindValue(String id, StateSpec<ValueState<T>> spec, Coder<T> coder) {
+                try {
+                    return new JStormValueState<>(
+                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)));
+                } catch (IOException e) {
+                    throw new RuntimeException();
+                }
+            }
+
+            @Override
+            public <T> BagState<T> bindBag(String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder) {
+                try {
+                    return new JStormBagState(
+                            getKey(), namespace, kvStoreManager.<ComposedKey, T>getOrCreate(getStoreId(id)),
+                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+                } catch (IOException e) {
+                    throw new RuntimeException();
+                }
+            }
+
+            @Override
+            public <T> SetState<T> bindSet(String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+                String id,
+                StateSpec<MapState<KeyT, ValueT>> spec,
+                Coder<KeyT> mapKeyCoder,
+                Coder<ValueT> mapValueCoder) {
+                try {
+                    return new JStormMapState<>(getKey(), namespace, kvStoreManager.<KeyT, ValueT>getOrCreate(getStoreId(id)));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public <InputT, AccumT, OutputT> CombiningState bindCombining(
+                    String id,
+                    StateSpec<CombiningState<InputT, AccumT, OutputT>> spec,
+                    Coder<AccumT> accumCoder,
+                    Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+                try {
+                    BagState<AccumT> accumBagState = new JStormBagState(
+                            getKey(), namespace,
+                            kvStoreManager.<ComposedKey, AccumT>getOrCreate(getStoreId(id)),
+                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+                    return new JStormCombiningState<>(accumBagState, combineFn);
+                } catch (IOException e) {
+                    throw new RuntimeException();
+                }
+            }
+
+
+            @Override
+            public <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT>
+            bindCombiningWithContext(
+                String id,
+                StateSpec<CombiningState<InputT, AccumT, OutputT>> stateSpec, Coder<AccumT> coder,
+                CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFnWithContext) {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public WatermarkHoldState bindWatermark(
+                String id,
+                StateSpec<WatermarkHoldState> spec,
+                final TimestampCombiner timestampCombiner) {
+                try {
+                    BagState<Combine.Holder<Instant>> accumBagState = new JStormBagState(
+                            getKey(), namespace,
+                            kvStoreManager.<ComposedKey, Combine.Holder<Instant>>getOrCreate(getStoreId(id)),
+                            kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
+
+                    Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
+                            new BinaryCombineFn<Instant>() {
+                                @Override
+                                public Instant apply(Instant left, Instant right) {
+                                  return timestampCombiner.combine(left, right);
+                                }};
+                    return new JStormWatermarkHoldState(
+                            namespace,
+                            new JStormCombiningState<>(
+                                    accumBagState,
+                                    outputTimeCombineFn),
+                            timestampCombiner,
+                            timerService);
+                } catch (IOException e) {
+                    throw new RuntimeException();
+                }
+            }
+        });
+    }
+
+    private String getStoreId(String stateId) {
+        return String.format("%s-%s", stateId, executorId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
new file mode 100644
index 0000000..5ad3663
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormValueState.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.ValueState;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * JStorm implementation of {@link ValueState}.
+ */
+public class JStormValueState<K, T> implements ValueState<T> {
+
+    @Nullable
+    private final K key;
+    private final StateNamespace namespace;
+    private final IKvStore<ComposedKey, T> kvState;
+
+    JStormValueState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState) {
+        this.key = key;
+        this.namespace = namespace;
+        this.kvState = kvState;
+    }
+
+    @Override
+    public void write(T t) {
+        try {
+            kvState.put(getComposedKey(), t);
+        } catch (IOException e) {
+            throw new RuntimeException(String.format(
+                    "Failed to write key: %s, namespace: %s, value: %s.", key, namespace, t));
+        }
+    }
+
+    @Override
+    public T read() {
+        try {
+            return kvState.get(getComposedKey());
+        } catch (IOException e) {
+            throw new RuntimeException(String.format(
+                    "Failed to read key: %s, namespace: %s.", key, namespace));
+        }
+    }
+
+    @Override
+    public ValueState<T> readLater() {
+        // TODO: support prefetch.
+        return this;
+    }
+
+    @Override
+    public void clear() {
+        try {
+            kvState.remove(getComposedKey());
+        } catch (IOException e) {
+            throw new RuntimeException(String.format(
+                    "Failed to clear key: %s, namespace: %s.", key, namespace));
+        }
+    }
+
+    private ComposedKey getComposedKey() {
+        return ComposedKey.of(key, namespace);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
new file mode 100644
index 0000000..659d77c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormWatermarkHoldState.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.GroupingState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.joda.time.Instant;
+
+/**
+ * JStorm implementation of {@link WatermarkHoldState}.
+ */
+public class JStormWatermarkHoldState implements WatermarkHoldState {
+
+    private final StateNamespace namespace;
+    private final GroupingState<Instant, Instant> watermarkHoldsState;
+    private final TimestampCombiner timestampCombiner;
+    private final TimerService timerService;
+
+    JStormWatermarkHoldState(
+            StateNamespace namespace,
+            GroupingState<Instant, Instant> watermarkHoldsState,
+            TimestampCombiner timestampCombiner,
+            TimerService timerService) {
+        this.namespace = checkNotNull(namespace, "namespace");
+        this.watermarkHoldsState = checkNotNull(watermarkHoldsState, "watermarkHoldsState");
+        this.timestampCombiner = checkNotNull(timestampCombiner, "timestampCombiner");
+        this.timerService = checkNotNull(timerService, "timerService");
+    }
+
+    @Override
+    public TimestampCombiner getTimestampCombiner() {
+        return timestampCombiner;
+    }
+
+    @Override
+    public void add(Instant instant) {
+        timerService.addWatermarkHold(namespace.stringKey(), instant);
+        watermarkHoldsState.add(instant);
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+        return watermarkHoldsState.isEmpty();
+    }
+
+    @Override
+    public Instant read() {
+        return watermarkHoldsState.read();
+    }
+
+    @Override
+    public WatermarkHoldState readLater() {
+        // TODO: support prefetch.
+        return this;
+    }
+
+    @Override
+    public void clear() {
+        timerService.clearWatermarkHold(namespace.stringKey());
+        watermarkHoldsState.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
new file mode 100644
index 0000000..4b5f83c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/timer/JStormTimerInternals.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.timer;
+
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.TimerService;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link TimerInternals}.
+ */
+public class JStormTimerInternals<K> implements TimerInternals {
+
+    private final K key;
+    private final DoFnExecutor<?, ?> doFnExecutor;
+    private final TimerService timerService;
+
+
+    public JStormTimerInternals(@Nullable K key, DoFnExecutor<?, ?> doFnExecutor, TimerService timerService) {
+        this.key = key;
+        this.doFnExecutor = checkNotNull(doFnExecutor, "doFnExecutor");
+        this.timerService = checkNotNull(timerService, "timerService");
+    }
+
+    @Override
+    public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+        setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+    }
+
+    @Override
+    @Deprecated
+    public void setTimer(TimerData timerData) {
+        timerService.setTimer(key, timerData, doFnExecutor);
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+        throw new UnsupportedOperationException(
+                "Canceling of a timer is not yet supported.");
+    }
+
+    @Override
+    @Deprecated
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+        throw new UnsupportedOperationException(
+                "Canceling of a timer is not yet supported.");
+    }
+
+    @Override
+    @Deprecated
+    public void deleteTimer(TimerData timerData) {
+        throw new UnsupportedOperationException(
+                "Canceling of a timer is not yet supported.");
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+        return Instant.now();
+    }
+
+    @Override
+    @Nullable
+    public Instant currentSynchronizedProcessingTime() {
+        return null;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+        return new Instant(timerService.currentInputWatermark());
+    }
+
+    @Override
+    @Nullable
+    public Instant currentOutputWatermarkTime() {
+        return new Instant(timerService.currentOutputWatermark());
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
new file mode 100644
index 0000000..9651fc2
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/BoundedSourceTranslator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Read.Bounded} into a Storm spout.
+ *
+ * @param <T>
+ */
+public class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+
+    @Override
+    public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+        TupleTag<?> outputTag = userGraphContext.getOutputTag();
+        PValue outputValue = userGraphContext.getOutput();
+        UnboundedSourceSpout spout = new UnboundedSourceSpout(
+                description,
+                new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+                userGraphContext.getOptions(), outputTag);
+
+        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(outputTag, outputValue));
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
new file mode 100644
index 0000000..c4da58a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombineGloballyTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Combine;
+
+public class CombineGloballyTranslator<InputT, OutputT> extends TransformTranslator.Default<Combine.Globally<InputT, OutputT>> {
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
new file mode 100644
index 0000000..99cbff7
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/CombinePerKeyTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Combine;
+
+public class CombinePerKeyTranslator<K, InputT, OutputT> extends TransformTranslator.Default<Combine.PerKey<K, InputT, OutputT>> {
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
new file mode 100644
index 0000000..4558216
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.Flatten;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.FlattenExecutor;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
+
+    @Override
+    public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+        // Since a new tag is created in PCollectionList, retrieve the real tag here.
+        Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+        for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+            PCollection<V> pc = (PCollection<V>) entry.getValue();
+            inputs.putAll(pc.expand());
+        }
+        System.out.println("Real inputs: " + inputs);
+        System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+        String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+        FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+        context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
new file mode 100644
index 0000000..6b8297b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.runtime.GroupByWindowExecutor;
+import com.google.common.collect.Lists;
+import org.apache.beam.sdk.transforms.GroupByKey;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Collections;
+import java.util.List;
+
+public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+    // information of transform
+    protected PCollection<KV<K, V>> input;
+    protected PCollection<KV<K, Iterable<V>>> output;
+    protected List<TupleTag<?>> inputTags;
+    protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+    protected List<TupleTag<?>> sideOutputTags;
+    protected List<PCollectionView<?>> sideInputs;
+    protected WindowingStrategy<?, ?> windowingStrategy;
+
+    @Override
+    public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+        input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+        output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+
+        inputTags = userGraphContext.getInputTags();
+        mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+        sideOutputTags = Lists.newArrayList();
+
+        sideInputs = Collections.<PCollectionView<?>>emptyList();
+        windowingStrategy = input.getWindowingStrategy();
+
+        GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+                userGraphContext.getStepName(),
+                description,
+                context,
+                context.getUserGraphContext().getOptions(),
+                windowingStrategy,
+                mainOutputTag,
+                sideOutputTags);
+        context.addTransformExecutor(groupByWindowExecutor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
new file mode 100644
index 0000000..c487578
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import avro.shaded.com.google.common.collect.Maps;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiOutputDoFnExecutor;
+import org.apache.beam.runners.jstorm.translation.runtime.MultiStatefulDoFnExecutor;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Translates a ParDo.BoundMulti to a Storm {@link DoFnExecutor}.
+ */
+public class ParDoBoundMultiTranslator<InputT, OutputT>
+        extends TransformTranslator.Default<ParDo.MultiOutput<InputT, OutputT>> {
+
+    @Override
+    public void translateNode(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
+        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        final TupleTag<InputT> inputTag = (TupleTag<InputT>) userGraphContext.getInputTag();
+        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+        Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
+        Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
+        for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
+            Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
+            localToExternalTupleTagMap.put(entry.getKey(), itr.next());
+        }
+
+        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+        List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
+        sideOutputTags.remove(mainOutputTag);
+
+        Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
+        for (PCollectionView pCollectionView : transform.getSideInputs()) {
+            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+        }
+        String description = describeTransform(
+                transform,
+                allInputs,
+                allOutputs);
+
+        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+        for (PCollectionView pCollectionView : transform.getSideInputs()) {
+            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+        }
+
+        DoFnExecutor executor;
+        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+        if (signature.stateDeclarations().size() > 0
+                || signature.timerDeclarations().size() > 0) {
+            executor = new MultiStatefulDoFnExecutor<>(
+                    userGraphContext.getStepName(),
+                    description,
+                    userGraphContext.getOptions(),
+                    (DoFn<KV, OutputT>) transform.getFn(),
+                    (Coder) WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+                    input.getWindowingStrategy(),
+                    (TupleTag<KV>) inputTag,
+                    transform.getSideInputs(),
+                    sideInputTagToView.build(),
+                    mainOutputTag,
+                    sideOutputTags,
+                    localToExternalTupleTagMap);
+        } else {
+            executor = new MultiOutputDoFnExecutor<>(
+                    userGraphContext.getStepName(),
+                    description,
+                    userGraphContext.getOptions(),
+                    transform.getFn(),
+                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+                    input.getWindowingStrategy(),
+                    inputTag,
+                    transform.getSideInputs(),
+                    sideInputTagToView.build(),
+                    mainOutputTag,
+                    sideOutputTags,
+                    localToExternalTupleTagMap);
+        }
+
+        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
new file mode 100644
index 0000000..3a952a9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import java.util.List;
+import java.util.Map;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.beam.runners.jstorm.translation.runtime.DoFnExecutor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.*;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
+ */
+public class ParDoBoundTranslator<InputT, OutputT>
+        extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
+
+    @Override
+    public void translateNode(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
+        final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        final TupleTag<?> inputTag = userGraphContext.getInputTag();
+        PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
+
+        TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
+        List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
+
+        Map<TupleTag<?>, PValue> allInputs = avro.shaded.com.google.common.collect.Maps.newHashMap(userGraphContext.getInputs());
+        for (PCollectionView pCollectionView : transform.getSideInputs()) {
+            allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+        }
+        String description = describeTransform(
+                transform,
+                allInputs,
+                userGraphContext.getOutputs());
+
+        ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
+        for (PCollectionView pCollectionView : transform.getSideInputs()) {
+            sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
+        }
+
+        DoFnExecutor executor;
+        DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
+        if (signature.stateDeclarations().size() > 0
+                || signature.timerDeclarations().size() > 0) {
+            executor = new StatefulDoFnExecutor<>(
+                    userGraphContext.getStepName(),
+                    description,
+                    userGraphContext.getOptions(),
+                    (DoFn<KV, OutputT>) transform.getFn(),
+                    (Coder) WindowedValue.getFullCoder(
+                            input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+                    input.getWindowingStrategy(),
+                    (TupleTag<KV>) inputTag,
+                    transform.getSideInputs(),
+                    sideInputTagToView.build(),
+                    mainOutputTag,
+                    sideOutputTags);
+        } else {
+            executor = new DoFnExecutor<>(
+                    userGraphContext.getStepName(),
+                    description,
+                    userGraphContext.getOptions(),
+                    transform.getFn(),
+                    WindowedValue.getFullCoder(input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
+                    input.getWindowingStrategy(),
+                    (TupleTag<InputT>) inputTag,
+                    transform.getSideInputs(),
+                    sideInputTagToView.build(),
+                    mainOutputTag,
+                    sideOutputTags);
+        }
+
+        context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
new file mode 100644
index 0000000..1ef1ec3
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ReshuffleTranslator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.transforms.Reshuffle;
+
+public class ReshuffleTranslator<K, V> extends TransformTranslator.Default<Reshuffle<K,V>> {
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
new file mode 100644
index 0000000..9f69391
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.auto.value.AutoValue;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Class that defines the stream connection between upstream and downstream components.
+ */
+@AutoValue
+public abstract class Stream {
+
+    public abstract Producer getProducer();
+    public abstract Consumer getConsumer();
+
+    public static Stream of(Producer producer, Consumer consumer) {
+        return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream(producer, consumer);
+    }
+
+    @AutoValue
+    public abstract static class Producer {
+        public abstract String getComponentId();
+        public abstract String getStreamId();
+        public abstract String getStreamName();
+
+        public static Producer of(String componentId, String streamId, String streamName) {
+            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Producer(
+                    componentId, streamId, streamName);
+        }
+    }
+
+    @AutoValue
+    public abstract static class Consumer {
+        public abstract String getComponentId();
+        public abstract Grouping getGrouping();
+
+        public static Consumer of(String componentId, Grouping grouping) {
+            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Consumer(
+                    componentId, grouping);
+        }
+    }
+
+    @AutoValue
+    public abstract static class Grouping {
+        public abstract Type getType();
+
+        @Nullable
+        public abstract List<String> getFields();
+
+        public static Grouping of(Type type) {
+            checkArgument(!Type.FIELDS.equals(type), "Fields grouping should provide key fields.");
+            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
+                    type, null /* fields */);
+        }
+
+        public static Grouping byFields(List<String> fields) {
+            checkNotNull(fields, "fields");
+            checkArgument(!fields.isEmpty(), "No key fields were provided for field grouping!");
+            return new com.alibaba.jstorm.beam.translation.translator.AutoValue_Stream_Grouping(
+                    Type.FIELDS, fields);
+        }
+
+        /**
+         * Types of stream groupings Storm allows
+         */
+        public enum Type {
+            ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
new file mode 100644
index 0000000..bebdf7b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * Interface for classes capable of tranforming Beam PTransforms into Storm primitives.
+ */
+public interface TransformTranslator<T extends PTransform<?, ?>> {
+
+    void translateNode(T transform, TranslationContext context);
+
+    /**
+     * Returns true if this translator can translate the given transform.
+     */
+    boolean canTranslate(T transform, TranslationContext context);
+
+    class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
+        @Override
+        public void translateNode(T1 transform, TranslationContext context) {
+
+        }
+
+        @Override
+        public boolean canTranslate(T1 transform, TranslationContext context) {
+            return true;
+        }
+
+        static String describeTransform(
+                PTransform<?, ?> transform,
+                Map<TupleTag<?>, PValue> inputs,
+                Map<TupleTag<?>, PValue> outputs) {
+            return String.format("%s --> %s --> %s",
+                    Joiner.on('+').join(FluentIterable.from(inputs.entrySet())
+                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                                @Override
+                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
+                                    return taggedPValue.getKey().getId();
+                                    // return taggedPValue.getValue().getName();
+                                }})),
+                    transform.getName(),
+                    Joiner.on('+').join(FluentIterable.from(outputs.entrySet())
+                            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
+                                @Override
+                                public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
+                                    return taggedPvalue.getKey().getId();
+                                    //return taggedPValue.getValue().getName();
+                                }})));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
new file mode 100644
index 0000000..ac7d7bd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/UnboundedSourceTranslator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
+
+/**
+ * Translates a Read.Unbounded into a Storm spout.
+ * 
+ * @param <T>
+ */
+public class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbounded<T>> {
+    public void translateNode(Read.Unbounded<T> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+        TupleTag<?> tag = userGraphContext.getOutputTag();
+        PValue output = userGraphContext.getOutput();
+
+        UnboundedSourceSpout spout = new UnboundedSourceSpout(
+                description,
+                transform.getSource(), userGraphContext.getOptions(), tag);
+        context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
new file mode 100644
index 0000000..0ebf837
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.runtime.ViewExecutor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link TransformTranslator} for executing {@link View Views} in JStorm runner.
+ */
+public class ViewTranslator extends TransformTranslator.Default<ViewTranslator.CreateJStormPCollectionView<?, ?>> {
+    @Override
+    public void translateNode(CreateJStormPCollectionView<?, ?> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+        ViewExecutor viewExecutor = new ViewExecutor(description, userGraphContext.getOutputTag());
+        context.addTransformExecutor(viewExecutor);
+    }
+
+    /**
+     * Specialized implementation for
+     * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
+     * for the Flink runner in streaming mode.
+     */
+    public static class ViewAsMap<K, V>
+            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+        @SuppressWarnings("unused") // used via reflection in JstormRunner#apply()
+        public ViewAsMap(View.AsMap<K, V> transform) {
+        }
+
+        @Override
+        public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
+            PCollectionView<Map<K, V>> view =
+                    PCollectionViews.mapView(
+                            input,
+                            input.getWindowingStrategy(),
+                            input.getCoder());
+
+            @SuppressWarnings({"rawtypes", "unchecked"})
+            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+            try {
+                inputCoder.getKeyCoder().verifyDeterministic();
+            } catch (Coder.NonDeterministicException e) {
+                // TODO: log warning as other runners.
+            }
+
+            return input
+                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingViewAsMap";
+        }
+    }
+
+    /**
+     * Specialized expansion for {@link
+     * View.AsMultimap View.AsMultimap} for the
+     * Flink runner in streaming mode.
+     */
+    public static class ViewAsMultimap<K, V>
+            extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
+
+        /**
+         * Builds an instance of this class from the overridden transform.
+         */
+        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+        public ViewAsMultimap(View.AsMultimap<K, V> transform) {
+        }
+
+        @Override
+        public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+            PCollectionView<Map<K, Iterable<V>>> view =
+                    PCollectionViews.multimapView(
+                            input,
+                            input.getWindowingStrategy(),
+                            input.getCoder());
+
+            @SuppressWarnings({"rawtypes", "unchecked"})
+            KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+            try {
+                inputCoder.getKeyCoder().verifyDeterministic();
+            } catch (Coder.NonDeterministicException e) {
+                // TODO: log warning as other runners.
+            }
+
+            return input
+                    .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults())
+                    .apply(CreateJStormPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingViewAsMultimap";
+        }
+    }
+
+    /**
+     * Specialized implementation for
+     * {@link View.AsList View.AsList} for the
+     * JStorm runner in streaming mode.
+     */
+    public static class ViewAsList<T>
+            extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+        /**
+         * Builds an instance of this class from the overridden transform.
+         */
+        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+        public ViewAsList(View.AsList<T> transform) {}
+
+        @Override
+        public PCollectionView<List<T>> expand(PCollection<T> input) {
+            PCollectionView<List<T>> view =
+                    PCollectionViews.listView(
+                            input,
+                            input.getWindowingStrategy(),
+                            input.getCoder());
+
+            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+                    .apply(CreateJStormPCollectionView.<T, List<T>>of(view));
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingViewAsList";
+        }
+    }
+
+    /**
+     * Specialized implementation for
+     * {@link View.AsIterable View.AsIterable} for the
+     * JStorm runner in streaming mode.
+     */
+    public static class ViewAsIterable<T>
+            extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+        /**
+         * Builds an instance of this class from the overridden transform.
+         */
+        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+        public ViewAsIterable(View.AsIterable<T> transform) { }
+
+        @Override
+        public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
+            PCollectionView<Iterable<T>> view =
+                    PCollectionViews.iterableView(
+                            input,
+                            input.getWindowingStrategy(),
+                            input.getCoder());
+
+            return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults())
+                    .apply(CreateJStormPCollectionView.<T, Iterable<T>>of(view));
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingViewAsIterable";
+        }
+    }
+
+    /**
+     * Specialized expansion for
+     * {@link View.AsSingleton View.AsSingleton} for the
+     * JStorm runner in streaming mode.
+     */
+    public static class ViewAsSingleton<T>
+            extends PTransform<PCollection<T>, PCollectionView<T>> {
+        private View.AsSingleton<T> transform;
+
+        /**
+         * Builds an instance of this class from the overridden transform.
+         */
+        @SuppressWarnings("unused") // used via reflection in JStormRunner#apply()
+        public ViewAsSingleton(View.AsSingleton<T> transform) {
+            this.transform = transform;
+        }
+
+        @Override
+        public PCollectionView<T> expand(PCollection<T> input) {
+            Combine.Globally<T, T> combine = Combine.globally(
+                    new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
+            if (!transform.hasDefaultValue()) {
+                combine = combine.withoutDefaults();
+            }
+            return input.apply(combine.asSingletonView());
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingViewAsSingleton";
+        }
+
+        private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
+            private boolean hasDefaultValue;
+            private T defaultValue;
+
+            SingletonCombine(boolean hasDefaultValue, T defaultValue) {
+                this.hasDefaultValue = hasDefaultValue;
+                this.defaultValue = defaultValue;
+            }
+
+            @Override
+            public T apply(T left, T right) {
+                throw new IllegalArgumentException("PCollection with more than one element "
+                        + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
+                        + "combine the PCollection into a single value");
+            }
+
+            @Override
+            public T identity() {
+                if (hasDefaultValue) {
+                    return defaultValue;
+                } else {
+                    throw new IllegalArgumentException(
+                            "Empty PCollection accessed as a singleton view. "
+                                    + "Consider setting withDefault to provide a default value");
+                }
+            }
+        }
+    }
+
+    public static class CombineGloballyAsSingletonView<InputT, OutputT>
+            extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+        /**
+         * Builds an instance of this class from the overridden transform.
+         */
+        @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+        public CombineGloballyAsSingletonView(
+                Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+            this.transform = transform;
+        }
+
+        @Override
+        public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+            PCollection<OutputT> combined =
+                    input.apply(Combine.globally(transform.getCombineFn())
+                            .withoutDefaults()
+                            .withFanout(transform.getFanout()));
+
+            PCollectionView<OutputT> view = PCollectionViews.singletonView(
+                    combined,
+                    combined.getWindowingStrategy(),
+                    transform.getInsertDefault(),
+                    transform.getInsertDefault()
+                            ? transform.getCombineFn().defaultValue() : null,
+                    combined.getCoder());
+            return combined
+                    .apply(ParDo.of(new WrapAsList<OutputT>()))
+                    .apply(CreateJStormPCollectionView.<OutputT, OutputT>of(view));
+        }
+
+        @Override
+        protected String getKindString() {
+            return "StreamingCombineGloballyAsSingletonView";
+        }
+    }
+
+    private static class WrapAsList<T> extends DoFn<T, List<T>> {
+        @ProcessElement
+        public void processElement(ProcessContext c) {
+            c.output(Collections.singletonList(c.element()));
+        }
+    }
+
+    /**
+     * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs.
+     * They require the input {@link PCollection} fits in memory.
+     * For a large {@link PCollection} this is expected to crash!
+     *
+     * @param <T> the type of elements to concatenate.
+     */
+    private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> {
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public List<T> createAccumulator() {
+            return new ArrayList<>();
+        }
+
+        @Override
+        public List<T> addInput(List<T> accumulator, T input) {
+            accumulator.add(input);
+            return accumulator;
+        }
+
+        @Override
+        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+            List<T> result = createAccumulator();
+            for (List<T> accumulator : accumulators) {
+                result.addAll(accumulator);
+            }
+            return result;
+        }
+
+        @Override
+        public List<T> extractOutput(List<T> accumulator) {
+            return accumulator;
+        }
+
+        @Override
+        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
+            return ListCoder.of(inputCoder);
+        }
+
+        @Override
+        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
+            return ListCoder.of(inputCoder);
+        }
+    }
+
+    /**
+     * Creates a primitive {@link PCollectionView}.
+     *
+     * <p>For internal use only by runner implementors.
+     *
+     * @param <ElemT> The type of the elements of the input PCollection
+     * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
+     */
+    public static class CreateJStormPCollectionView<ElemT, ViewT>
+            extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+        private PCollectionView<ViewT> view;
+
+        private CreateJStormPCollectionView(PCollectionView<ViewT> view) {
+            this.view = view;
+        }
+
+        public static <ElemT, ViewT> CreateJStormPCollectionView<ElemT, ViewT> of(
+                PCollectionView<ViewT> view) {
+            return new CreateJStormPCollectionView<>(view);
+        }
+
+        @Override
+        public PCollectionView<ViewT> expand(PCollection<List<ElemT>> input) {
+            return view;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
new file mode 100644
index 0000000..0bf9a49
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+
+public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+
+    @Override
+    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+        TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+        String description = describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+        context.getUserGraphContext().setWindowed();
+        WindowAssignExecutor executor = new WindowAssignExecutor(
+                description,
+                transform.getWindowFn(),
+                userGraphContext.getOutputTag());
+        context.addTransformExecutor(executor);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
new file mode 100644
index 0000000..b67aff9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowBoundTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.translator;
+
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Translates a Window.Bound node into a Storm WindowedBolt
+ * 
+ * @param <T>
+ */
+public class WindowBoundTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundTranslator.class);
+
+    // Do nothing here currently. The assign of window strategy is included in AssignTranslator.
+    @Override
+    public void translateNode(Window.Assign<T> transform, TranslationContext context) {
+        if (transform.getWindowFn() instanceof FixedWindows) {
+            context.getUserGraphContext().setWindowed();
+        } else if (transform.getWindowFn() instanceof SlidingWindows) {
+            context.getUserGraphContext().setWindowed();
+        } else {
+            throw new UnsupportedOperationException("Not supported window type currently: " + transform.getWindowFn());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
new file mode 100644
index 0000000..07a3ad5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+public class CommonInstance {
+    public static final String KEY = "Key";
+    public static final String VALUE = "Value";
+
+    public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
new file mode 100644
index 0000000..87562fd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/DefaultSideInputReader.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.util;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * No-op SideInputReader implementation.
+ */
+public class DefaultSideInputReader implements SideInputReader, Serializable {
+    @Nullable
+    @Override
+    public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
+        return null;
+    }
+
+    @Override
+    public <T> boolean contains(PCollectionView<T> pCollectionView) {
+        return false;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return true;
+    }
+}