You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:04 UTC

[04/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
new file mode 100644
index 0000000..1de881f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.task.TopologyContext;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class ExecutorContext {
+    public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
+        return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+    }
+
+    public abstract TopologyContext getTopologyContext();
+
+    public abstract ExecutorsBolt getExecutorsBolt();
+
+    public abstract IKvStoreManager getKvStoreManager();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
new file mode 100644
index 0000000..9df1e17
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.IOException;
+import java.util.*;
+
+import avro.shaded.com.google.common.base.Joiner;
+import avro.shaded.com.google.common.collect.Sets;
+import backtype.storm.tuple.ITupleExt;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ExecutorsBolt extends AdaptorBasicBolt {
+    private static final long serialVersionUID = -7751043327801735211L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+
+    protected ExecutorContext executorContext;
+
+    protected TimerService timerService;
+
+    // map from input tag to executor inside bolt
+    protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+    // set of all output tags that will be emit outside bolt
+    protected final Set<TupleTag> outputTags = Sets.newHashSet();
+    protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+    protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+    protected int internalDoFnExecutorId = 1;
+    protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+
+    protected OutputCollector collector;
+
+    protected boolean isStatefulBolt = false;
+
+    protected KryoSerializer<WindowedValue> serializer;
+
+    public ExecutorsBolt() {
+
+    }
+
+    public void setStatefulBolt(boolean isStateful) {
+        isStatefulBolt = isStateful;
+    }
+
+    public void addExecutor(TupleTag inputTag, Executor executor) {
+        inputTagToExecutor.put(
+                checkNotNull(inputTag, "inputTag"),
+                checkNotNull(executor, "executor"));
+    }
+
+    public Map<TupleTag, Executor> getExecutors() {
+        return inputTagToExecutor;
+    }
+
+    public void registerExecutor(Executor executor) {
+        if (executor instanceof DoFnExecutor) {
+            DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+            idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+            doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+            internalDoFnExecutorId++;
+        }
+    }
+
+    public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+        return idToDoFnExecutor;
+    }
+
+    public void addOutputTags(TupleTag tag) {
+        outputTags.add(tag);
+    }
+
+    public void addExternalOutputTag(TupleTag<?> tag) {
+        externalOutputTags.add(tag);
+    }
+
+    public Set<TupleTag> getOutputTags() {
+        return outputTags;
+    }
+
+    public ExecutorContext getExecutorContext() {
+        return executorContext;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+        try {
+            this.collector = collector;
+
+            // init kv store manager
+            String storeName = String.format("task-%d", context.getThisTaskId());
+            String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+            IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
+                    KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
+            this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+            // init time service
+            timerService = initTimerService();
+
+            // init all internal executors
+            for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+                executor.init(executorContext);
+                if (executor instanceof DoFnExecutor) {
+                    doFnExecutors.add((DoFnExecutor) executor);
+                }
+            }
+
+            this.serializer = new KryoSerializer<WindowedValue>(stormConf);
+
+            LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+            LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+            LOG.info("outputTags={}", outputTags);
+            LOG.info("externalOutputTags={}", externalOutputTags);
+            LOG.info("doFnExecutors={}", doFnExecutors);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to prepare executors bolt", e);
+        }
+    }
+
+    public TimerService initTimerService() {
+        TopologyContext context = executorContext.getTopologyContext();
+        List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+                .transformAndConcat(
+                        new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+                            @Override
+                            public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+                                if (Common.isSystemComponent(value.getKey())) {
+                                    return Collections.EMPTY_LIST;
+                                } else {
+                                    return value.getValue();
+                                }
+                            }
+                        })
+                .toList();
+        TimerService ret = new TimerServiceImpl(executorContext);
+        ret.init(tasks);
+        return ret;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        // process a batch
+        String streamId = input.getSourceStreamId();
+        ITupleExt tuple = (ITupleExt) input;
+        Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+        if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+            while (valueIterator.hasNext()) {
+                processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+            }
+        } else {
+            doFnStartBundle();
+            while (valueIterator.hasNext()) {
+                processElement(valueIterator.next(), streamId);
+            }
+            doFnFinishBundle();
+        }
+    }
+
+    private void processWatermark(long watermarkTs, int sourceTask) {
+        long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+        LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+                (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
+        if (newWaterMark != 0) {
+            // Some buffer windows are going to be triggered.
+            doFnStartBundle();
+            timerService.fireTimers(newWaterMark);
+
+            // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+            // to be received from now on. So we are going to process all push back data.
+            if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+                for (DoFnExecutor doFnExecutor : doFnExecutors) {
+                    doFnExecutor.processAllPushBackElements();
+                }
+            }
+
+            doFnFinishBundle();
+        }
+
+        long currentWaterMark = timerService.currentOutputWatermark();
+        if (!externalOutputTags.isEmpty()) {
+            collector.flush();
+            collector.emit(
+                    CommonInstance.BEAM_WATERMARK_STREAM_ID,
+                    new Values(currentWaterMark));
+            LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
+        }
+    }
+
+    private void processElement(List<Object> values, String streamId) {
+        TupleTag inputTag = new TupleTag(streamId);
+        WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+        processExecutorElem(inputTag, windowedValue);
+    }
+
+    public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+        LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+        if (elem != null) {
+            Executor executor = inputTagToExecutor.get(inputTag);
+            if (executor != null) {
+                executor.process(inputTag, elem);
+            }
+            if (externalOutputTags.contains(inputTag)) {
+                emitOutsideBolt(inputTag, elem);
+            }
+        } else {
+            LOG.info("Received null elem for tag={}", inputTag);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+            executor.cleanup();
+        }
+        executorContext.getKvStoreManager().close();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    public TimerService timerService() {
+        return timerService;
+    }
+
+    public void setTimerService(TimerService service) {
+        timerService = service;
+    }
+
+    private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+        WindowedValue wv = null;
+        if (values.size() > 1) {
+            Object key = values.get(0);
+            WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+            wv = value.withValue(KV.of(key, value.getValue()));
+        } else {
+            wv = serializer.deserialize((byte[])values.get(0));
+        }
+        return wv;
+    }
+
+    protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+        LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+        if (keyedEmit(outputTag.getId())) {
+            KV kv = (KV) outputValue.getValue();
+            byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+            // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+            if (kv.getKey() == null) {
+                // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+                collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+            } else {
+                collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+            }
+        } else {
+            byte[] immutableOutputValue = serializer.serialize(outputValue);
+            collector.emit(outputTag.getId(), new Values(immutableOutputValue));
+        }
+    }
+
+    private void doFnStartBundle() {
+        for (DoFnExecutor doFnExecutor : doFnExecutors) {
+            doFnExecutor.startBundle();
+        }
+    }
+
+    private void doFnFinishBundle() {
+        for (DoFnExecutor doFnExecutor : doFnExecutors) {
+            doFnExecutor.finishBundle();
+        }
+    }
+
+    @Override
+    public String toString() {
+        // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+        List<String> ret = new ArrayList<>();
+        /*ret.add("inputTags");
+        for (TupleTag inputTag : inputTagToExecutor.keySet()) {
+            ret.add(inputTag.getId());
+        }*/
+        ret.add("internalExecutors");
+        for (Executor executor : inputTagToExecutor.values()) {
+            ret.add(executor.toString());
+        }
+        ret.add("externalOutputTags");
+        for (TupleTag output : externalOutputTags) {
+            ret.add(output.getId());
+        }
+        return Joiner.on('\n').join(ret).concat("\n");
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
new file mode 100644
index 0000000..1ef28c9
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class FlattenExecutor<InputT> implements Executor {
+
+    private final String description;
+    private TupleTag mainOutputTag;
+    private ExecutorContext context;
+    private ExecutorsBolt executorsBolt;
+
+    public FlattenExecutor(String description, TupleTag mainTupleTag) {
+        this.description = checkNotNull(description, "description");
+        this.mainOutputTag = mainTupleTag;
+    }
+
+    @Override
+    public void init(ExecutorContext context) {
+        this.context = context;
+        this.executorsBolt = context.getExecutorsBolt();
+    }
+
+    @Override
+    public void process(TupleTag tag, WindowedValue elem) {
+        executorsBolt.processExecutorElem(mainOutputTag, elem);
+    }
+
+    @Override
+    public void cleanup() {}
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
new file mode 100644
index 0000000..419a4a0
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+    private static final long serialVersionUID = -7563050475488610553L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
+
+    private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+            executorsBolt.processExecutorElem(tag, output);
+        }
+    }
+
+    private KvCoder<K, V> inputKvCoder;
+    private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+    public GroupByWindowExecutor(
+            String stepName,
+            String description,
+            TranslationContext context,
+            StormPipelineOptions pipelineOptions,
+            WindowingStrategy<?, ?> windowingStrategy,
+            TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+        // The doFn will be created when runtime. Just pass "null" here
+        super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
+
+        this.outputManager = new GroupByWindowOutputManager();
+        UserGraphContext userGraphContext = context.getUserGraphContext();
+        PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+        this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+    }
+
+    private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+        final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+            @Override
+            public StateInternals stateInternalsForKey(K key) {
+                return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+            }
+        };
+        TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+            @Override
+            public TimerInternals timerInternalsForKey(K key) {
+                return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
+            }
+        };
+
+        reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+              GroupAlsoByWindowViaWindowSetNewDoFn.create(
+                  windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+                      (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+        return doFn;
+    }
+
+    @Override
+    protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+        doFn = getGroupByWindowDoFn();
+
+        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
+                this.pipelineOptions,
+                this.doFn,
+                NullSideInputReader.empty(),
+                this.outputManager,
+                this.mainTupleTag,
+                this.sideOutputTags,
+                this.stepContext,
+                this.windowingStrategy);
+
+        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
+                simpleRunner,
+                this.stepContext,
+                this.windowingStrategy);
+        return new DoFnRunnerWithMetrics<>(
+            stepName, doFnRunner, MetricsReporter.create(metricClient));
+    }
+
+    @Override
+    public void process(TupleTag tag, WindowedValue elem) {
+        /**
+         *  For GroupByKey, KV type elem is received. We need to convert the KV elem
+         *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+         */
+        KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+        runner.processElement(elem.withValue(keyedWorkItem));
+    }
+
+    @Override
+    public void onTimer(Object key, TimerInternals.TimerData timerData) {
+        StateNamespace namespace = timerData.getNamespace();
+        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+        runner.processElement(
+                WindowedValue.valueInGlobalWindow(
+                        KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
new file mode 100644
index 0000000..a022440
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MetricsReporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+
+import com.alibaba.jstorm.common.metric.AsmCounter;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+
+/**
+ * Class that holds a {@link MetricsContainerStepMap}, and reports metrics to JStorm engine.
+ */
+public class MetricsReporter {
+
+  private static final String METRIC_KEY_SEPARATOR = "__";
+  private static final String COUNTER_PREFIX = "__counter";
+
+  private final MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap();
+  private final Map<String, Long> reportedCounters = Maps.newHashMap();
+  private final MetricClient metricClient;
+
+  public static MetricsReporter create(MetricClient metricClient) {
+    return new MetricsReporter(metricClient);
+  }
+
+  private MetricsReporter(MetricClient metricClient) {
+    this.metricClient = checkNotNull(metricClient, "metricClient");
+  }
+
+  public MetricsContainer getMetricsContainer(String stepName) {
+    return metricsContainers.getContainer(stepName);
+  }
+
+  public void updateMetrics() {
+    MetricResults metricResults = asAttemptedOnlyMetricResults(metricsContainers);
+    MetricQueryResults metricQueryResults =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+    updateCounters(metricQueryResults.counters());
+  }
+
+  private void updateCounters(Iterable<MetricResult<Long>> counters) {
+    System.out.print("updateCounters");
+    for (MetricResult<Long> metricResult : counters) {
+      String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
+      System.out.print("metricName: " + metricName);
+      Long updateValue = metricResult.attempted();
+      Long oldValue = reportedCounters.get(metricName);
+
+      if (oldValue == null || oldValue < updateValue) {
+        AsmCounter counter = metricClient.registerCounter(metricName);
+        Long incValue = (oldValue == null ? updateValue : updateValue - oldValue);
+        counter.update(incValue);
+      }
+    }
+  }
+
+  private String getMetricNameString(String prefix, MetricResult<?> metricResult) {
+    return prefix
+        + METRIC_KEY_SEPARATOR + metricResult.step()
+        + METRIC_KEY_SEPARATOR + metricResult.name().namespace()
+        + METRIC_KEY_SEPARATOR + metricResult.name().name();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
new file mode 100644
index 0000000..28dc234
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
+    private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+
+    /**
+     * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
+     * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
+     * tag. See PCollectionTuple for details.
+     */
+    public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+            if (localTupleTagMap.containsKey(tag)) {
+                executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+            } else {
+                executorsBolt.processExecutorElem(tag, output);
+            }
+        }
+    }
+
+    protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+
+    public MultiOutputDoFnExecutor(
+            String stepName,
+            String description,
+            StormPipelineOptions pipelineOptions,
+            DoFn<InputT, OutputT> doFn,
+            Coder<WindowedValue<InputT>> inputCoder,
+            WindowingStrategy<?, ?> windowingStrategy,
+            TupleTag<InputT> mainInputTag,
+            Collection<PCollectionView<?>> sideInputs,
+            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+            TupleTag<OutputT> mainTupleTag,
+            List<TupleTag<?>> sideOutputTags,
+            Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+            ) {
+        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+                sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+        this.localTupleTagMap = localTupleTagMap;
+        this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+        LOG.info("localTupleTagMap: {}", localTupleTagMap);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
new file mode 100644
index 0000000..a58a818
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
+
+    public MultiStatefulDoFnExecutor(
+        String stepName, String description,
+        StormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+        Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+        TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+        Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+        List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+    }
+
+    @Override
+    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+        if (mainInputTag.equals(tag)) {
+            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+                    executorContext.getExecutorsBolt().timerService()));
+            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+            processMainInput(elem);
+        } else {
+            processSideInput(tag, elem);
+        }
+    }
+
+    @Override
+    public void onTimer(Object key, TimerInternals.TimerData timerData) {
+        stepContext.setStateInternals(new JStormStateInternals<>(key,
+                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+        super.onTimer(key, timerData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
new file mode 100644
index 0000000..269f03c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
+    public StatefulDoFnExecutor(
+            String stepName, String description, StormPipelineOptions pipelineOptions,
+            DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+            WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+            Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+                    sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+                mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+    }
+
+    @Override
+    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+        if (mainInputTag.equals(tag)) {
+            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+                    executorContext.getExecutorsBolt().timerService()));
+            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+            processMainInput(elem);
+        } else {
+            processSideInput(tag, elem);
+        }
+    }
+
+    @Override
+    public void onTimer(Object key, TimerInternals.TimerData timerData) {
+        stepContext.setStateInternals(new JStormStateInternals<>(key,
+                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+        super.onTimer(key, timerData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
new file mode 100644
index 0000000..47db018
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface that tracks input watermarks and manages timers in each bolt.
+ */
+public interface TimerService extends Serializable {
+
+    void init(List<Integer> upStreamTasks);
+
+    /**
+     *
+     * @param task
+     * @param inputWatermark
+     * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+     */
+    long updateInputWatermark(Integer task, long inputWatermark);
+
+    long currentInputWatermark();
+
+    long currentOutputWatermark();
+
+    void clearWatermarkHold(String namespace);
+
+    void addWatermarkHold(String namespace, Instant watermarkHold);
+
+    void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+
+    void fireTimers(long newWatermark);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
new file mode 100644
index 0000000..3b864d5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import avro.shaded.com.google.common.collect.Maps;
+import avro.shaded.com.google.common.collect.Sets;
+import com.alibaba.jstorm.utils.Pair;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.joda.time.Instant;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Default implementation of {@link TimerService}.
+ */
+public class TimerServiceImpl implements TimerService {
+    private transient ExecutorContext executorContext;
+    private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+    private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>();
+    private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+    private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+    private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+    private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>();
+    private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+            timerDataToKeyedExecutors = Maps.newHashMap();
+
+    private boolean initialized = false;
+
+    public TimerServiceImpl() {
+    }
+
+    public TimerServiceImpl(ExecutorContext executorContext) {
+        this.executorContext = executorContext;
+        this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+    }
+
+    @Override
+    public void init(List<Integer> upStreamTasks) {
+        for (Integer task : upStreamTasks) {
+            upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+            inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+        }
+        initialized = true;
+    }
+
+    @Override
+    public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+        checkState(initialized, "TimerService has not been initialized.");
+        Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+        // Make sure the input watermark don't go backward.
+        if (taskInputWatermark > oldTaskInputWatermark) {
+            upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+            inputWatermarks.add(taskInputWatermark);
+            inputWatermarks.remove(oldTaskInputWatermark);
+
+            long newLocalInputWatermark = currentInputWatermark();
+            if (newLocalInputWatermark > oldTaskInputWatermark) {
+                return newLocalInputWatermark;
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public void fireTimers(long newWatermark) {
+        TimerInternals.TimerData timerData;
+        while ((timerData = eventTimeTimersQueue.peek()) != null
+                && timerData.getTimestamp().getMillis() <= newWatermark) {
+            for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+                DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+                executor.onTimer(keyedExecutor.getSecond(), timerData);
+            }
+            eventTimeTimersQueue.remove();
+            timerDataToKeyedExecutors.remove(timerData);
+        }
+    }
+
+    @Override
+    public long currentInputWatermark() {
+        return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    }
+
+    @Override
+    public long currentOutputWatermark() {
+        if (watermarkHolds.isEmpty()) {
+            return currentInputWatermark();
+        } else {
+            return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
+        }
+    }
+
+    @Override
+    public void clearWatermarkHold(String namespace) {
+        Instant currentHold = namespaceToWatermarkHold.get(namespace);
+        if (currentHold != null) {
+            watermarkHolds.remove(currentHold);
+            namespaceToWatermarkHold.remove(namespace);
+        }
+    }
+
+    @Override
+    public void addWatermarkHold(String namespace, Instant watermarkHold) {
+        Instant currentHold = namespaceToWatermarkHold.get(namespace);
+        if (currentHold == null) {
+            namespaceToWatermarkHold.put(namespace, watermarkHold);
+            watermarkHolds.add(watermarkHold);
+        } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+            namespaceToWatermarkHold.put(namespace, watermarkHold);
+            watermarkHolds.add(watermarkHold);
+            watermarkHolds.remove(currentHold);
+        }
+    }
+
+    @Override
+    public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+        checkArgument(
+                TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+                String.format("Does not support domain: %s.", timerData.getDomain()));
+        Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+        if (keyedExecutors == null) {
+            keyedExecutors = Sets.newHashSet();
+            eventTimeTimersQueue.add(timerData);
+        }
+        keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+        timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
new file mode 100644
index 0000000..0fb88ab
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
+    private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+    private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+    private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+    private ExecutorsBolt executorsBolt;
+    private IKvStoreManager kvStoreManager;
+    private IKvStore<String, TimerService> timerServiceStore;
+
+    public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+        this.executorsBolt = executorsBolt;
+        this.executorsBolt.setStatefulBolt(true);
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        try {
+            executorsBolt.prepare(stormConf, context, collector);
+            kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+        } catch (IOException e) {
+            LOG.error("Failed to prepare stateful bolt", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        executorsBolt.execute(input);
+    }
+
+    @Override
+    public void cleanup() {
+        executorsBolt.cleanup();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        executorsBolt.declareOutputFields(declarer);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return executorsBolt.getComponentConfiguration();
+    }
+
+    @Override
+    public void initState(Object userState) {
+        LOG.info("Begin to init from state: {}", userState);
+        restore(userState);
+    }
+
+    @Override
+    public Object finishBatch(long batchId) {
+        try {
+            timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+        } catch (IOException e) {
+            LOG.error("Failed to store current timer service status", e);
+            throw new RuntimeException(e.getMessage());
+        }
+        kvStoreManager.checkpoint(batchId);
+        return null;
+    }
+
+    @Override
+    public Object commit(long batchId, Object state) {
+        return kvStoreManager.backup(batchId);
+    }
+
+    @Override
+    public void rollBack(Object userState) {
+        LOG.info("Begin to rollback from state: {}", userState);
+        restore(userState);
+    }
+
+    @Override
+    public void ackCommit(long batchId, long timeStamp) {
+        kvStoreManager.remove(batchId);
+    }
+
+    private void restore(Object userState) {
+        try {
+            // restore all states
+            kvStoreManager.restore(userState);
+
+            // init timer service
+            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+            TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+            if (timerService == null) {
+                timerService = executorsBolt.initTimerService();
+            }
+            executorsBolt.setTimerService(timerService);
+        } catch (IOException e) {
+            LOG.error("Failed to restore state", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
new file mode 100644
index 0000000..22dd07b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
+    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+    private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+    private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+    private UnboundedSourceSpout sourceSpout;
+    private UnboundedSource.UnboundedReader reader;
+    private IKvStoreManager kvStoreManager;
+    private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+    public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+        this.sourceSpout = sourceSpout;
+    }
+
+    private void restore(Object userState) {
+        try {
+            kvStoreManager.restore(userState);
+            sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+            UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+            sourceSpout.createSourceReader(checkpointMark);
+            reader = sourceSpout.getUnboundedSourceReader();
+        } catch (IOException e) {
+            LOG.error("Failed to init state", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void initState(Object userState) {
+        restore(userState);
+    }
+
+    @Override
+    public Object finishBatch(long checkpointId) {
+        try {
+            // Store check point mark from unbounded source reader
+            UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+            sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+            // checkpoint all kv stores in current manager
+            kvStoreManager.checkpoint(checkpointId);
+        } catch (IOException e) {
+            LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+            throw new RuntimeException(e.getMessage());
+        }
+        return null;
+    }
+
+    @Override
+    public Object commit(long batchId, Object state) {
+        // backup kv stores to remote state backend
+        return kvStoreManager.backup(batchId);
+    }
+
+    @Override
+    public void rollBack(Object userState) {
+        restore(userState);
+    }
+
+    @Override
+    public void ackCommit(long batchId, long timeStamp) {
+        // remove obsolete state in bolt local and remote state backend
+        kvStoreManager.remove(batchId);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        sourceSpout.declareOutputFields(declarer);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return sourceSpout.getComponentConfiguration();
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        try {
+            sourceSpout.open(conf, context, collector);
+            String storeName = String.format("task-%s", context.getThisTaskId());
+            String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+            kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true);
+
+            reader = sourceSpout.getUnboundedSourceReader();
+        } catch (IOException e) {
+            LOG.error("Failed to open transactional unbounded source spout", e);
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void close() {
+        sourceSpout.close();
+    }
+
+    @Override
+    public void activate() {
+        sourceSpout.activate();
+    }
+
+    @Override
+    public void deactivate() {
+        sourceSpout.deactivate();
+    }
+
+    @Override
+    public void nextTuple() {
+        sourceSpout.nextTuple();
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
new file mode 100644
index 0000000..973f703
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Values;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Spout implementation that wraps a Beam UnboundedSource
+ *
+ * TODO: add wrapper to support metrics in UnboundedSource.
+ */
+public class UnboundedSourceSpout extends AdaptorBasicSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+
+    private final String description;
+    private final UnboundedSource source;
+    private final SerializedPipelineOptions serializedOptions;
+    private final TupleTag<?> outputTag;
+
+    private transient StormPipelineOptions pipelineOptions;
+    private transient UnboundedSource.UnboundedReader reader;
+    private transient SpoutOutputCollector collector;
+
+    private volatile boolean hasNextRecord;
+    private AtomicBoolean activated = new AtomicBoolean();
+
+    private KryoSerializer<WindowedValue> serializer;
+
+    private long lastWaterMark = 0l;
+
+    public UnboundedSourceSpout(
+            String description,
+            UnboundedSource source,
+            StormPipelineOptions options,
+            TupleTag<?> outputTag) {
+        this.description = checkNotNull(description, "description");
+        this.source = checkNotNull(source, "source");
+        this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
+        this.outputTag = checkNotNull(outputTag, "outputTag");
+    }
+
+    @Override
+    public synchronized void close() {
+        try {
+            activated.set(false);
+            this.reader.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void activate() {
+        activated.set(true);
+        
+    }
+
+    @Override
+    public void deactivate() {
+        activated.set(false);
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        try {
+            this.collector = collector;
+            this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+            createSourceReader(null);
+
+            this.serializer = new KryoSerializer<>(conf);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to create unbounded reader.", e);
+        }
+    }
+
+    public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+        if (reader != null) {
+            reader.close();
+        }
+        reader = this.source.createReader(this.pipelineOptions, checkpointMark);
+        hasNextRecord = this.reader.start();
+    }
+
+    @Override
+    public synchronized void nextTuple() {
+        if (!activated.get()) {
+            return;
+        }
+        try {
+            if (!hasNextRecord) {
+                hasNextRecord = reader.advance();
+            }
+
+            while (hasNextRecord && activated.get()) {
+                Object value = reader.getCurrent();
+                Instant timestamp = reader.getCurrentTimestamp();
+
+                WindowedValue wv = WindowedValue.of(value, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+                LOG.debug("Source output: " + wv.getValue());
+                if (keyedEmit(outputTag.getId())) {
+                    KV kv = (KV) wv.getValue();
+                    // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+                    byte[] immutableValue = serializer.serialize(wv.withValue(kv.getValue()));
+                    collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableValue));
+                } else {
+                    byte[] immutableValue = serializer.serialize(wv);
+                    collector.emit(outputTag.getId(), new Values(immutableValue));
+                }
+
+                // move to next record
+                hasNextRecord = reader.advance();
+            }
+
+            Instant waterMark = reader.getWatermark();
+            if (waterMark != null && lastWaterMark <  waterMark.getMillis()) {
+                lastWaterMark = waterMark.getMillis();
+                collector.flush();
+                collector.emit(CommonInstance.BEAM_WATERMARK_STREAM_ID, new Values(waterMark.getMillis()));
+                LOG.debug("Source output: WM-{}", waterMark.toDateTime());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Exception reading values from source.", e);
+        }
+    }
+
+    public UnboundedSource getUnboundedSource() {
+        return source;
+    }
+
+    public UnboundedSource.UnboundedReader getUnboundedSourceReader() {
+        return reader;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
new file mode 100644
index 0000000..7b0e8db
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ViewExecutor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link View}.
+ */
+public class ViewExecutor implements Executor {
+
+    private final String description;
+    private final TupleTag outputTag;
+    private ExecutorsBolt executorsBolt;
+
+    public ViewExecutor(String description, TupleTag outputTag) {
+        this.description = description;
+        this.outputTag = outputTag;
+    }
+
+    @Override
+    public void init(ExecutorContext context) {
+        this.executorsBolt = context.getExecutorsBolt();
+    }
+
+    @Override
+    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+        executorsBolt.processExecutorElem(outputTag, elem);
+    }
+
+    @Override
+    public void cleanup() {}
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
new file mode 100644
index 0000000..a6c3c16
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
+    private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
+
+    private final String description;
+    private WindowFn<T, W> windowFn;
+    private ExecutorsBolt executorsBolt;
+    private TupleTag outputTag;
+
+    class JStormAssignContext<InputT, W extends BoundedWindow>
+            extends WindowFn<InputT, W>.AssignContext {
+        private final WindowedValue<InputT> value;
+
+        JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
+            fn.super();
+            checkArgument(
+                    Iterables.size(value.getWindows()) == 1,
+                    String.format(
+                            "%s passed to window assignment must be in a single window, but it was in %s: %s",
+                            WindowedValue.class.getSimpleName(),
+                            Iterables.size(value.getWindows()),
+                            value.getWindows()));
+            this.value = value;
+        }
+
+        @Override
+        public InputT element() {
+            return value.getValue();
+        }
+
+        @Override
+        public Instant timestamp() {
+            return value.getTimestamp();
+        }
+
+        @Override
+        public BoundedWindow window() {
+            return Iterables.getOnlyElement(value.getWindows());
+        }
+    }
+
+    public WindowAssignExecutor(String description, WindowFn<T, W> windowFn, TupleTag outputTag) {
+        this.description = description;
+        this.windowFn = windowFn;
+        this.outputTag = outputTag;
+    }
+
+    @Override
+    public void init(ExecutorContext context) {
+        this.executorsBolt = context.getExecutorsBolt();
+    }
+
+    @Override
+    public void process(TupleTag tag, WindowedValue elem) {
+        Collection<W> windows = null;
+        try {
+            windows = windowFn.assignWindows(new JStormAssignContext<>(windowFn, elem));
+            for (W window: windows) {
+                executorsBolt.processExecutorElem(
+                        outputTag,
+                        WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to assign windows for elem=" + elem, e);
+        }
+    }
+
+    @Override
+    public void cleanup() {}
+
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
new file mode 100644
index 0000000..eaf0549
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * JStorm implementation of {@link BagState}.
+ */
+class JStormBagState<K, T> implements BagState<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+    @Nullable
+    private final K key;
+    private final StateNamespace namespace;
+    private final IKvStore<ComposedKey, T> kvState;
+    private final IKvStore<ComposedKey, Object> stateInfoKvState;
+    private int elemIndex;
+
+    public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+                           IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+        this.key = key;
+        this.namespace = checkNotNull(namespace, "namespace");
+        this.kvState = checkNotNull(kvState, "kvState");
+        this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+        Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+        this.elemIndex =  index != null ? ++index : 0;
+    }
+
+    @Override
+    public void add(T input) {
+        try {
+            kvState.put(getComposedKey(elemIndex), input);
+            stateInfoKvState.put(getComposedKey(), elemIndex);
+            elemIndex++;
+        } catch (IOException e) {
+            throw new RuntimeException(e.getCause());
+        }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+        return new ReadableState<Boolean>() {
+            @Override
+            public Boolean read() {
+                return elemIndex <= 0;
+            }
+
+            @Override
+            public ReadableState<Boolean> readLater() {
+                // TODO: support prefetch.
+                return this;
+            }
+        };
+    }
+
+    @Override
+    public Iterable<T> read() {
+        return new BagStateIterable(elemIndex);
+    }
+
+    @Override
+    public BagState readLater() {
+        // TODO: support prefetch.
+        return this;
+    }
+
+    @Override
+    public void clear() {
+        try {
+            for (int i = 0; i < elemIndex; i++) {
+                kvState.remove(getComposedKey(i));
+            }
+            stateInfoKvState.remove(getComposedKey());
+            elemIndex = 0;
+        } catch (IOException e) {
+            throw new RuntimeException(e.getCause());
+        }
+    }
+
+    private ComposedKey getComposedKey() {
+        return ComposedKey.of(key, namespace);
+    }
+
+    private ComposedKey getComposedKey(int elemIndex) {
+        return ComposedKey.of(key, namespace, elemIndex);
+    }
+
+    private class BagStateIterable implements KvStoreIterable<T> {
+
+        private class BagStateIterator implements Iterator<T> {
+            private final int size;
+            private int cursor = 0;
+
+            BagStateIterator() {
+                Integer s = null;
+                try {
+                    s = (Integer) stateInfoKvState.get(getComposedKey());
+                } catch (IOException e) {
+                    LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+                }
+                this.size = s != null ? ++s : 0;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return cursor < size;
+            }
+
+            @Override
+            public T next() {
+                if (cursor >= size) {
+                    throw new NoSuchElementException();
+                }
+
+                T value = null;
+                try {
+                    value = kvState.get(getComposedKey(cursor));
+                } catch (IOException e) {
+                    LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+                }
+                cursor++;
+                return value;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        }
+
+        private final int size;
+
+        BagStateIterable(int size) {
+            this.size = size;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            return new BagStateIterator();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
new file mode 100644
index 0000000..b0fe29b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormCombiningState.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * JStorm implementation of {@link CombiningState}.
+ */
+public class JStormCombiningState<InputT, AccumT, OutputT>
+        implements CombiningState<InputT, AccumT, OutputT> {
+
+    @Nullable
+    private final BagState<AccumT> accumBagState;
+    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+    JStormCombiningState(
+            BagState<AccumT> accumBagState,
+            Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+        this.combineFn = checkNotNull(combineFn, "combineFn");
+    }
+
+    @Override
+    public AccumT getAccum() {
+        // TODO: replacing the accumBagState with the merged accum.
+        return combineFn.mergeAccumulators(accumBagState.read());
+    }
+
+    @Override
+    public void addAccum(AccumT accumT) {
+        accumBagState.add(accumT);
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+        return combineFn.mergeAccumulators(iterable);
+    }
+
+    @Override
+    public void add(InputT input) {
+        accumBagState.add(
+                combineFn.addInput(combineFn.createAccumulator(), input));
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+        return accumBagState.isEmpty();
+    }
+
+    @Override
+    public OutputT read() {
+        return combineFn.extractOutput(
+            combineFn.mergeAccumulators(accumBagState.read()));
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+        // TODO: support prefetch.
+        return this;
+    }
+
+    @Override
+    public void clear() {
+        accumBagState.clear();
+    }
+}