You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:25 UTC

[25/53] [abbrv] beam git commit: jstorm-runner: move most classes to translation package and reduece their visibility to package private.

jstorm-runner: move most classes to translation package and reduece their visibility to package private.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82653534
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82653534
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82653534

Branch: refs/heads/jstorm-runner
Commit: 82653534b0b738ee84ed94a67f9344393778d033
Parents: 9309ac4
Author: Pei He <pe...@apache.org>
Authored: Fri Jul 14 15:28:53 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunner.java       |  14 +-
 .../jstorm/serialization/package-info.java      |  22 ++
 .../jstorm/translation/AbstractComponent.java   |  67 ++++
 .../translation/BoundedSourceTranslator.java    |  49 +++
 .../jstorm/translation/CommonInstance.java      |  28 ++
 .../jstorm/translation/DefaultStepContext.java  |  90 +++++
 .../jstorm/translation/DoFnExecutor.java        | 339 +++++++++++++++++
 .../translation/DoFnRunnerWithMetrics.java      |  91 +++++
 .../runners/jstorm/translation/Executor.java    |  36 ++
 .../jstorm/translation/ExecutorContext.java     |  41 ++
 .../jstorm/translation/ExecutorsBolt.java       | 338 +++++++++++++++++
 .../jstorm/translation/FlattenExecutor.java     |  60 +++
 .../jstorm/translation/FlattenTranslator.java   |  49 +++
 .../translation/GroupByKeyTranslator.java       |  71 ++++
 .../translation/GroupByWindowExecutor.java      | 173 +++++++++
 .../jstorm/translation/JStormBagState.java      | 180 +++++++++
 .../translation/JStormCombiningState.java       |  88 +++++
 .../jstorm/translation/JStormMapState.java      | 158 ++++++++
 .../translation/JStormPipelineTranslator.java   |   2 -
 .../translation/JStormStateInternals.java       | 190 ++++++++++
 .../translation/JStormTimerInternals.java       |  97 +++++
 .../jstorm/translation/JStormValueState.java    |  82 ++++
 .../translation/JStormWatermarkHoldState.java   |  82 ++++
 .../jstorm/translation/MetricsReporter.java     |  87 +++++
 .../translation/MultiOutputDoFnExecutor.java    |  79 ++++
 .../translation/MultiStatefulDoFnExecutor.java  |  70 ++++
 .../translation/ParDoBoundMultiTranslator.java  | 114 ++++++
 .../translation/ParDoBoundTranslator.java       | 107 ++++++
 .../runners/jstorm/translation/RunnerUtils.java |  51 +++
 .../translation/SerializedPipelineOptions.java  |  65 ++++
 .../translation/SingletonKeyedWorkItem.java     |  62 +++
 .../translation/StatefulDoFnExecutor.java       |  68 ++++
 .../beam/runners/jstorm/translation/Stream.java | 104 +++++
 .../jstorm/translation/TimerService.java        |  51 +++
 .../jstorm/translation/TimerServiceImpl.java    | 155 ++++++++
 .../jstorm/translation/TransformTranslator.java |  79 ++++
 .../jstorm/translation/TranslationContext.java  |   6 -
 .../jstorm/translation/TranslatorRegistry.java  |  11 +-
 .../jstorm/translation/TxExecutorsBolt.java     | 133 +++++++
 .../translation/TxUnboundedSourceSpout.java     | 156 ++++++++
 .../translation/UnboundedSourceSpout.java       | 189 +++++++++
 .../translation/UnboundedSourceTranslator.java  |  44 +++
 .../jstorm/translation/ViewExecutor.java        |  56 +++
 .../jstorm/translation/ViewTranslator.java      | 378 ++++++++++++++++++
 .../translation/WindowAssignExecutor.java       | 112 ++++++
 .../translation/WindowAssignTranslator.java     |  41 ++
 .../jstorm/translation/package-info.java        |  22 ++
 .../translation/runtime/AbstractComponent.java  |  68 ----
 .../translation/runtime/DoFnExecutor.java       | 343 -----------------
 .../runtime/DoFnRunnerWithMetrics.java          |  91 -----
 .../jstorm/translation/runtime/Executor.java    |  36 --
 .../translation/runtime/ExecutorContext.java    |  41 --
 .../translation/runtime/ExecutorsBolt.java      | 339 -----------------
 .../translation/runtime/FlattenExecutor.java    |  60 ---
 .../runtime/GroupByWindowExecutor.java          | 177 ---------
 .../translation/runtime/MetricsReporter.java    |  87 -----
 .../runtime/MultiOutputDoFnExecutor.java        |  79 ----
 .../runtime/MultiStatefulDoFnExecutor.java      |  72 ----
 .../runtime/StatefulDoFnExecutor.java           |  70 ----
 .../translation/runtime/TimerService.java       |  51 ---
 .../translation/runtime/TimerServiceImpl.java   | 155 --------
 .../translation/runtime/TxExecutorsBolt.java    | 133 -------
 .../runtime/TxUnboundedSourceSpout.java         | 156 --------
 .../runtime/UnboundedSourceSpout.java           | 191 ----------
 .../translation/runtime/ViewExecutor.java       |  56 ---
 .../runtime/WindowAssignExecutor.java           | 112 ------
 .../runtime/state/JStormBagState.java           | 180 ---------
 .../runtime/state/JStormCombiningState.java     |  88 -----
 .../runtime/state/JStormMapState.java           | 158 --------
 .../runtime/state/JStormStateInternals.java     | 191 ----------
 .../runtime/state/JStormValueState.java         |  82 ----
 .../runtime/state/JStormWatermarkHoldState.java |  83 ----
 .../runtime/timer/JStormTimerInternals.java     | 100 -----
 .../translator/BoundedSourceTranslator.java     |  51 ---
 .../translator/FlattenTranslator.java           |  51 ---
 .../translator/GroupByKeyTranslator.java        |  73 ----
 .../translator/ParDoBoundMultiTranslator.java   | 118 ------
 .../translator/ParDoBoundTranslator.java        | 110 ------
 .../jstorm/translation/translator/Stream.java   | 104 -----
 .../translator/TransformTranslator.java         |  80 ----
 .../translator/UnboundedSourceTranslator.java   |  46 ---
 .../translation/translator/ViewTranslator.java  | 380 -------------------
 .../translator/WindowAssignTranslator.java      |  43 ---
 .../jstorm/translation/util/CommonInstance.java |  28 --
 .../translation/util/DefaultStepContext.java    |  90 -----
 .../beam/runners/jstorm/util/RunnerUtils.java   |  55 ---
 .../jstorm/util/SerializedPipelineOptions.java  |  65 ----
 .../jstorm/util/SingletonKeyedWorkItem.java     |  62 ---
 .../translation/JStormStateInternalsTest.java   | 221 +++++++++++
 .../runtime/state/JStormStateInternalsTest.java | 222 -----------
 90 files changed, 4783 insertions(+), 4802 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 8782130..baf4e5a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -38,15 +38,15 @@ import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
 import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
 import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
 import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
+import org.apache.beam.runners.jstorm.translation.AbstractComponent;
+import org.apache.beam.runners.jstorm.translation.CommonInstance;
+import org.apache.beam.runners.jstorm.translation.ExecutorsBolt;
 import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator;
+import org.apache.beam.runners.jstorm.translation.Stream;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.runtime.AbstractComponent;
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.TxExecutorsBolt;
-import org.apache.beam.runners.jstorm.translation.runtime.TxUnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.runners.jstorm.translation.TxExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.TxUnboundedSourceSpout;
+import org.apache.beam.runners.jstorm.translation.UnboundedSourceSpout;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
new file mode 100644
index 0000000..f5ac931
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of kryo serializers.
+ */
+package org.apache.beam.runners.jstorm.serialization;

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
new file mode 100644
index 0000000..35ae88d
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/AbstractComponent.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Enable user to add output stream definitions by API, rather than hard-code.
+ */
+public abstract class AbstractComponent implements IComponent {
+  private Map<String, Fields> streamToFields = new HashMap<>();
+  private Map<String, Boolean> keyStreams = new HashMap<>();
+  private int parallelismNum = 0;
+
+  public void addOutputField(String streamId) {
+    addOutputField(streamId, new Fields(CommonInstance.VALUE));
+  }
+
+  public void addOutputField(String streamId, Fields fields) {
+    streamToFields.put(streamId, fields);
+    keyStreams.put(streamId, false);
+  }
+
+  public void addKVOutputField(String streamId) {
+    streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+    keyStreams.put(streamId, true);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+      declarer.declareStream(entry.getKey(), entry.getValue());
+    }
+  }
+
+  public boolean keyedEmit(String streamId) {
+    Boolean isKeyedStream = keyStreams.get(streamId);
+    return isKeyedStream == null ? false : isKeyedStream;
+  }
+
+  public int getParallelismNum() {
+    return parallelismNum;
+  }
+
+  public void setParallelismNum(int num) {
+    parallelismNum = num;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
new file mode 100644
index 0000000..f64193e
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Read.Bounded} into a Storm spout.
+ *
+ * @param <T>
+ */
+class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
+
+  @Override
+  public void translateNode(Read.Bounded<T> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+    TupleTag<?> outputTag = userGraphContext.getOutputTag();
+    PValue outputValue = userGraphContext.getOutput();
+    UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        description,
+        new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
+        userGraphContext.getOptions(), outputTag);
+
+    context.getExecutionGraphContext().registerSpout(
+        spout, TaggedPValue.of(outputTag, outputValue));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
new file mode 100644
index 0000000..b7154cd
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/CommonInstance.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+/**
+ * Common definition of JStorm runner.
+ */
+public class CommonInstance {
+  public static final String KEY = "Key";
+  public static final String VALUE = "Value";
+
+  public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
new file mode 100644
index 0000000..9fd584b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DefaultStepContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Default StepContext for running DoFn This does not allow accessing state or timer internals.
+ */
+class DefaultStepContext implements ExecutionContext.StepContext {
+
+  private TimerInternals timerInternals;
+
+  private StateInternals stateInternals;
+
+  public DefaultStepContext(TimerInternals timerInternals, StateInternals stateInternals) {
+    this.timerInternals = checkNotNull(timerInternals, "timerInternals");
+    this.stateInternals = checkNotNull(stateInternals, "stateInternals");
+  }
+
+  @Override
+  public String getStepName() {
+    return null;
+  }
+
+  @Override
+  public String getTransformName() {
+    return null;
+  }
+
+  @Override
+  public void noteOutput(WindowedValue<?> windowedValue) {
+
+  }
+
+  @Override
+  public void noteOutput(TupleTag<?> tupleTag, WindowedValue<?> windowedValue) {
+
+  }
+
+  @Override
+  public <T, W extends BoundedWindow> void writePCollectionViewData(
+      TupleTag<?> tag, Iterable<WindowedValue<T>> data,
+      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder)
+      throws IOException {
+    throw new UnsupportedOperationException("Writing side-input data is not supported.");
+  }
+
+  @Override
+  public StateInternals stateInternals() {
+    return stateInternals;
+  }
+
+  @Override
+  public TimerInternals timerInternals() {
+    return timerInternals;
+  }
+
+  public void setStateInternals(StateInternals stateInternals) {
+    this.stateInternals = stateInternals;
+  }
+
+  public void setTimerInternals(TimerInternals timerInternals) {
+    this.timerInternals = timerInternals;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
new file mode 100644
index 0000000..fdd9af6
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link DoFn}.
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+class DoFnExecutor<InputT, OutputT> implements Executor {
+  private static final long serialVersionUID = 5297603063991078668L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+
+  /**
+   * Implements {@link OutputManager} in a DoFn executor.
+   */
+  public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+    private static final long serialVersionUID = -661113364735206170L;
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      executorsBolt.processExecutorElem(tag, output);
+    }
+  }
+
+  protected transient DoFnRunner<InputT, OutputT> runner = null;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+  protected final String stepName;
+
+  protected int internalDoFnExecutorId;
+
+  protected final String description;
+
+  protected final TupleTag<OutputT> mainTupleTag;
+  protected final List<TupleTag<?>> sideOutputTags;
+
+  protected SerializedPipelineOptions serializedOptions;
+  protected transient JStormPipelineOptions pipelineOptions;
+
+  protected DoFn<InputT, OutputT> doFn;
+  protected final Coder<WindowedValue<InputT>> inputCoder;
+  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+  protected OutputManager outputManager;
+  protected WindowingStrategy<?, ?> windowingStrategy;
+  protected final TupleTag<InputT> mainInputTag;
+  protected Collection<PCollectionView<?>> sideInputs;
+  protected SideInputHandler sideInputHandler;
+  protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+  // Initialize during runtime
+  protected ExecutorContext executorContext;
+  protected ExecutorsBolt executorsBolt;
+  protected TimerInternals timerInternals;
+  protected transient StateInternals pushbackStateInternals;
+  protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+  protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+  protected transient IKvStoreManager kvStoreManager;
+  protected DefaultStepContext stepContext;
+  protected transient MetricClient metricClient;
+
+  public DoFnExecutor(
+      String stepName,
+      String description,
+      JStormPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Coder<WindowedValue<InputT>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<InputT> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+      TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags) {
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.description = checkNotNull(description, "description");
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.outputManager = new DoFnExecutorOutputManager();
+    this.windowingStrategy = windowingStrategy;
+    this.mainInputTag = mainInputTag;
+    this.sideInputs = sideInputs;
+    this.mainTupleTag = mainTupleTag;
+    this.sideOutputTags = sideOutputTags;
+    this.sideInputTagToView = sideInputTagToView;
+  }
+
+  protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+    return new DoFnRunnerWithMetrics<>(
+        stepName,
+        DoFnRunners.simpleRunner(
+            this.pipelineOptions,
+            this.doFn,
+            this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+            this.outputManager,
+            this.mainTupleTag,
+            this.sideOutputTags,
+            this.stepContext,
+            this.windowingStrategy),
+        MetricsReporter.create(metricClient));
+  }
+
+  protected void initService(ExecutorContext context) {
+    // TODO: what should be set for key in here?
+    timerInternals = new JStormTimerInternals(
+        null /* key */, this, context.getExecutorsBolt().timerService());
+    kvStoreManager = context.getKvStoreManager();
+    stepContext = new DefaultStepContext(timerInternals,
+        new JStormStateInternals(
+            null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    metricClient = new MetricClient(executorContext.getTopologyContext());
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorContext = context;
+    this.executorsBolt = context.getExecutorsBolt();
+    this.pipelineOptions =
+        this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+    initService(context);
+
+    // Side inputs setup
+    if (sideInputs != null && !sideInputs.isEmpty()) {
+      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+      watermarkHoldTag =
+          StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+      pushbackStateInternals = new JStormStateInternals(
+          null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+      sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+      runner = getDoFnRunner();
+      pushbackRunner =
+          SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+    } else {
+      runner = getDoFnRunner();
+    }
+
+    // Process user's setup
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+        tag, mainInputTag, sideInputs, elem.getValue()));
+    if (mainInputTag.equals(tag)) {
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
+    }
+  }
+
+  protected <T> void processMainInput(WindowedValue<T> elem) {
+    if (sideInputs.isEmpty()) {
+      runner.processElement((WindowedValue<InputT>) elem);
+    } else {
+      Iterable<WindowedValue<InputT>> justPushedBack =
+          pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+        if (pushedBackValue.getTimestamp().isBefore(min)) {
+          min = pushedBackValue.getTimestamp();
+        }
+        min = earlier(min, pushedBackValue.getTimestamp());
+        pushedBack.add(pushedBackValue);
+      }
+      pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
+    }
+  }
+
+  protected void processSideInput(TupleTag tag, WindowedValue elem) {
+    LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+
+    PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+    sideInputHandler.addSideInputValue(sideInputView, elem);
+
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+    Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+    if (pushedBackInputs != null) {
+      for (WindowedValue<InputT> input : pushedBackInputs) {
+
+        Iterable<WindowedValue<InputT>> justPushedBack =
+            pushbackRunner.processElementInReadyWindows(input);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
+    }
+    pushedBack.clear();
+
+    Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+      min = earlier(min, pushedBackValue.getTimestamp());
+      pushedBack.add(pushedBackValue);
+    }
+
+    WatermarkHoldState watermarkHold =
+        pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+    // TODO: clear-then-add is not thread-safe.
+    watermarkHold.clear();
+    watermarkHold.add(min);
+  }
+
+  /**
+   * Process all pushed back elements when receiving watermark with max timestamp.
+   */
+  public void processAllPushBackElements() {
+    if (sideInputs != null && !sideInputs.isEmpty()) {
+      BagState<WindowedValue<InputT>> pushedBackElements =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+      if (pushedBackElements != null) {
+        for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+          LOG.info("Process pushback elem={}", elem);
+          runner.processElement(elem);
+        }
+        pushedBackElements.clear();
+      }
+
+      WatermarkHoldState watermarkHold =
+          pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+      watermarkHold.clear();
+      watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    }
+  }
+
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    StateNamespace namespace = timerData.getNamespace();
+    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+    if (pushbackRunner != null) {
+      pushbackRunner.onTimer(
+          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+    } else {
+      runner.onTimer(
+          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    doFnInvoker.invokeTeardown();
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+
+  private Instant earlier(Instant left, Instant right) {
+    return left.isBefore(right) ? left : right;
+  }
+
+  public void startBundle() {
+    if (pushbackRunner != null) {
+      pushbackRunner.startBundle();
+    } else {
+      runner.startBundle();
+    }
+  }
+
+  public void finishBundle() {
+    if (pushbackRunner != null) {
+      pushbackRunner.finishBundle();
+    } else {
+      runner.finishBundle();
+    }
+  }
+
+  public void setInternalDoFnExecutorId(int id) {
+    this.internalDoFnExecutorId = id;
+  }
+
+  public int getInternalDoFnExecutorId() {
+    return internalDoFnExecutorId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
new file mode 100644
index 0000000..f614f1c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnRunnerWithMetrics.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * DoFnRunner decorator which registers {@link MetricsContainer}.
+ */
+class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  private final String stepName;
+  private final DoFnRunner<InputT, OutputT> delegate;
+  private final MetricsReporter metricsReporter;
+
+  DoFnRunnerWithMetrics(
+      String stepName,
+      DoFnRunner<InputT, OutputT> delegate,
+      MetricsReporter metricsReporter) {
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
+  }
+
+  @Override
+  public void startBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.onTimer(timerId, window, timestamp, timeDomain);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    metricsReporter.updateMetrics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
new file mode 100644
index 0000000..145b224
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * An executor is a basic executable unit in a JStorm task.
+ */
+interface Executor extends Serializable {
+  /**
+   * Initialization during runtime.
+   */
+  void init(ExecutorContext context);
+
+  <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
+
+  void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
new file mode 100644
index 0000000..487db35
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import backtype.storm.task.TopologyContext;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.google.auto.value.AutoValue;
+
+/**
+ * Context of a executors bolt when runtime.
+ */
+@AutoValue
+abstract class ExecutorContext {
+  public static ExecutorContext of(
+      TopologyContext topologyContext,
+      ExecutorsBolt bolt,
+      IKvStoreManager kvStoreManager) {
+    return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+  }
+
+  public abstract TopologyContext getTopologyContext();
+
+  public abstract ExecutorsBolt getExecutorsBolt();
+
+  public abstract IKvStoreManager getKvStoreManager();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
new file mode 100644
index 0000000..ef12db8
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBatchBolt;
+import backtype.storm.tuple.ITupleExt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.cache.KvStoreManagerFactory;
+import com.alibaba.jstorm.cluster.Common;
+import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
+ */
+public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
+  private static final long serialVersionUID = -7751043327801735211L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+
+  protected ExecutorContext executorContext;
+
+  protected TimerService timerService;
+
+  // map from input tag to executor inside bolt
+  protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+  // set of all output tags that will be emit outside bolt
+  protected final Set<TupleTag> outputTags = Sets.newHashSet();
+  protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+  protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+  protected int internalDoFnExecutorId = 1;
+  protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+
+  protected OutputCollector collector;
+
+  protected boolean isStatefulBolt = false;
+
+  protected KryoSerializer<WindowedValue> serializer;
+
+  public ExecutorsBolt() {
+
+  }
+
+  public void setStatefulBolt(boolean isStateful) {
+    isStatefulBolt = isStateful;
+  }
+
+  public void addExecutor(TupleTag inputTag, Executor executor) {
+    inputTagToExecutor.put(
+        checkNotNull(inputTag, "inputTag"),
+        checkNotNull(executor, "executor"));
+  }
+
+  public Map<TupleTag, Executor> getExecutors() {
+    return inputTagToExecutor;
+  }
+
+  public void registerExecutor(Executor executor) {
+    if (executor instanceof DoFnExecutor) {
+      DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+      idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+      doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+      internalDoFnExecutorId++;
+    }
+  }
+
+  public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+    return idToDoFnExecutor;
+  }
+
+  public void addOutputTags(TupleTag tag) {
+    outputTags.add(tag);
+  }
+
+  public void addExternalOutputTag(TupleTag<?> tag) {
+    externalOutputTags.add(tag);
+  }
+
+  public Set<TupleTag> getOutputTags() {
+    return outputTags;
+  }
+
+  public ExecutorContext getExecutorContext() {
+    return executorContext;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+    try {
+      this.collector = collector;
+
+      // init kv store manager
+      String storeName = String.format("task-%d", context.getThisTaskId());
+      String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+      IKvStoreManager kvStoreManager = isStatefulBolt
+              ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+              context, storeName, stateStorePath, isStatefulBolt)
+              : KvStoreManagerFactory.getKvStoreManager(
+              stormConf, storeName, stateStorePath, isStatefulBolt);
+      this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+      // init time service
+      timerService = initTimerService();
+
+      // init all internal executors
+      for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+        executor.init(executorContext);
+        if (executor instanceof DoFnExecutor) {
+          doFnExecutors.add((DoFnExecutor) executor);
+        }
+      }
+
+      this.serializer = new KryoSerializer<WindowedValue>(stormConf);
+
+      LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+      LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+      LOG.info("outputTags={}", outputTags);
+      LOG.info("externalOutputTags={}", externalOutputTags);
+      LOG.info("doFnExecutors={}", doFnExecutors);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to prepare executors bolt", e);
+    }
+  }
+
+  public TimerService initTimerService() {
+    TopologyContext context = executorContext.getTopologyContext();
+    List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+        .transformAndConcat(
+            new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+              @Override
+              public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+                if (Common.isSystemComponent(value.getKey())) {
+                  return Collections.EMPTY_LIST;
+                } else {
+                  return value.getValue();
+                }
+              }
+            })
+        .toList();
+    TimerService ret = new TimerServiceImpl(executorContext);
+    ret.init(tasks);
+    return ret;
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    // process a batch
+    String streamId = input.getSourceStreamId();
+    ITupleExt tuple = (ITupleExt) input;
+    Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+    if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+      while (valueIterator.hasNext()) {
+        processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+      }
+    } else {
+      doFnStartBundle();
+      while (valueIterator.hasNext()) {
+        processElement(valueIterator.next(), streamId);
+      }
+      doFnFinishBundle();
+    }
+  }
+
+  private void processWatermark(long watermarkTs, int sourceTask) {
+    long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+    LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+        (new Instant(watermarkTs)).toDateTime(),
+        sourceTask,
+        (new Instant(newWaterMark)).toDateTime());
+    if (newWaterMark != 0) {
+      // Some buffer windows are going to be triggered.
+      doFnStartBundle();
+      timerService.fireTimers(newWaterMark);
+
+      // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+      // to be received from now on. So we are going to process all push back data.
+      if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        for (DoFnExecutor doFnExecutor : doFnExecutors) {
+          doFnExecutor.processAllPushBackElements();
+        }
+      }
+
+      doFnFinishBundle();
+    }
+
+    long currentWaterMark = timerService.currentOutputWatermark();
+    if (!externalOutputTags.isEmpty()) {
+      collector.flush();
+      collector.emit(
+          CommonInstance.BEAM_WATERMARK_STREAM_ID,
+          new Values(currentWaterMark));
+      LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
+    }
+  }
+
+  private void processElement(List<Object> values, String streamId) {
+    TupleTag inputTag = new TupleTag(streamId);
+    WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+    processExecutorElem(inputTag, windowedValue);
+  }
+
+  public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+    LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+    if (elem != null) {
+      Executor executor = inputTagToExecutor.get(inputTag);
+      if (executor != null) {
+        executor.process(inputTag, elem);
+      }
+      if (externalOutputTags.contains(inputTag)) {
+        emitOutsideBolt(inputTag, elem);
+      }
+    } else {
+      LOG.info("Received null elem for tag={}", inputTag);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+      executor.cleanup();
+    }
+    executorContext.getKvStoreManager().close();
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  public TimerService timerService() {
+    return timerService;
+  }
+
+  public void setTimerService(TimerService service) {
+    timerService = service;
+  }
+
+  private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+    WindowedValue wv = null;
+    if (values.size() > 1) {
+      Object key = values.get(0);
+      WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+      wv = value.withValue(KV.of(key, value.getValue()));
+    } else {
+      wv = serializer.deserialize((byte[]) values.get(0));
+    }
+    return wv;
+  }
+
+  protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+    LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+    if (keyedEmit(outputTag.getId())) {
+      KV kv = (KV) outputValue.getValue();
+      byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+      // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+      if (kv.getKey() == null) {
+        // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+        collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+      } else {
+        collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+      }
+    } else {
+      byte[] immutableOutputValue = serializer.serialize(outputValue);
+      collector.emit(outputTag.getId(), new Values(immutableOutputValue));
+    }
+  }
+
+  private void doFnStartBundle() {
+    for (DoFnExecutor doFnExecutor : doFnExecutors) {
+      doFnExecutor.startBundle();
+    }
+  }
+
+  private void doFnFinishBundle() {
+    for (DoFnExecutor doFnExecutor : doFnExecutors) {
+      doFnExecutor.finishBundle();
+    }
+  }
+
+  @Override
+  public String toString() {
+    // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+    List<String> ret = new ArrayList<>();
+        /*ret.add("inputTags");
+        for (TupleTag inputTag : inputTagToExecutor.keySet()) {
+            ret.add(inputTag.getId());
+        }*/
+    ret.add("internalExecutors");
+    for (Executor executor : inputTagToExecutor.values()) {
+      ret.add(executor.toString());
+    }
+    ret.add("externalOutputTags");
+    for (TupleTag output : externalOutputTags) {
+      ret.add(output.getId());
+    }
+    return Joiner.on('\n').join(ret).concat("\n");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
new file mode 100644
index 0000000..a64f494
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
+ * @param <InputT>
+ */
+class FlattenExecutor<InputT> implements Executor {
+
+  private final String description;
+  private TupleTag mainOutputTag;
+  private ExecutorContext context;
+  private ExecutorsBolt executorsBolt;
+
+  public FlattenExecutor(String description, TupleTag mainTupleTag) {
+    this.description = checkNotNull(description, "description");
+    this.mainOutputTag = mainTupleTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.context = context;
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    executorsBolt.processExecutorElem(mainOutputTag, elem);
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
new file mode 100644
index 0000000..89708df
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
+ * @param <V>
+ */
+class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
+
+  @Override
+  public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+
+    // Since a new tag is created in PCollectionList, retrieve the real tag here.
+    Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
+    for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
+      PCollection<V> pc = (PCollection<V>) entry.getValue();
+      inputs.putAll(pc.expand());
+    }
+    System.out.println("Real inputs: " + inputs);
+    System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
+    String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
+    FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+    context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
new file mode 100644
index 0000000..85c958a
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
+ * @param <K>
+ * @param <V>
+ */
+class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
+  // information of transform
+  protected PCollection<KV<K, V>> input;
+  protected PCollection<KV<K, Iterable<V>>> output;
+  protected List<TupleTag<?>> inputTags;
+  protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
+  protected List<TupleTag<?>> sideOutputTags;
+  protected List<PCollectionView<?>> sideInputs;
+  protected WindowingStrategy<?, ?> windowingStrategy;
+
+  @Override
+  public void translateNode(GroupByKey<K, V> transform, TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    String description =
+        describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
+
+    input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+    output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
+
+    inputTags = userGraphContext.getInputTags();
+    mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
+    sideOutputTags = Lists.newArrayList();
+
+    sideInputs = Collections.<PCollectionView<?>>emptyList();
+    windowingStrategy = input.getWindowingStrategy();
+
+    GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
+        userGraphContext.getStepName(),
+        description,
+        context,
+        context.getUserGraphContext().getOptions(),
+        windowingStrategy,
+        mainOutputTag,
+        sideOutputTags);
+    context.addTransformExecutor(groupByWindowExecutor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
new file mode 100644
index 0000000..bf6e1ad
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
+ * @param <K>
+ * @param <V>
+ */
+class GroupByWindowExecutor<K, V>
+    extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+  private static final long serialVersionUID = -7563050475488610553L;
+
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
+
+  private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
+
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      executorsBolt.processExecutorElem(tag, output);
+    }
+  }
+
+  private KvCoder<K, V> inputKvCoder;
+  private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+  public GroupByWindowExecutor(
+      String stepName,
+      String description,
+      TranslationContext context,
+      JStormPipelineOptions pipelineOptions,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+    // The doFn will be created when runtime. Just pass "null" here
+    super(
+        stepName,
+        description,
+        pipelineOptions,
+        null,
+        null,
+        windowingStrategy,
+        null,
+        null,
+        null,
+        mainTupleTag,
+        sideOutputTags);
+
+    this.outputManager = new GroupByWindowOutputManager();
+    UserGraphContext userGraphContext = context.getUserGraphContext();
+    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+    this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+  }
+
+  private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+    final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+      @Override
+      public StateInternals stateInternalsForKey(K key) {
+        return new JStormStateInternals<K>(
+            key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+      }
+    };
+    TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+      @Override
+      public TimerInternals timerInternalsForKey(K key) {
+        return new JStormTimerInternals<>(
+            key,
+            GroupByWindowExecutor.this,
+            executorContext.getExecutorsBolt().timerService());
+      }
+    };
+
+    reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+    DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+        GroupAlsoByWindowViaWindowSetNewDoFn.create(
+            windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+            (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+    return doFn;
+  }
+
+  @Override
+  protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+    doFn = getGroupByWindowDoFn();
+
+    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
+        this.pipelineOptions,
+        this.doFn,
+        NullSideInputReader.empty(),
+        this.outputManager,
+        this.mainTupleTag,
+        this.sideOutputTags,
+        this.stepContext,
+        this.windowingStrategy);
+
+    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
+        DoFnRunners.lateDataDroppingRunner(
+            simpleRunner,
+            this.stepContext,
+            this.windowingStrategy);
+    return new DoFnRunnerWithMetrics<>(
+        stepName, doFnRunner, MetricsReporter.create(metricClient));
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    /**
+     *  For GroupByKey, KV type elem is received. We need to convert the KV elem
+     *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+     */
+    KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+    runner.processElement(elem.withValue(keyedWorkItem));
+  }
+
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    StateNamespace namespace = timerData.getNamespace();
+    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+    runner.processElement(
+        WindowedValue.valueInGlobalWindow(
+            KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+  }
+
+  @Override
+  public String toString() {
+    return super.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
new file mode 100644
index 0000000..3e5d52b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormBagState.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.IKvStore;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link BagState} in JStorm runner.
+ */
+class JStormBagState<K, T> implements BagState<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
+
+  @Nullable
+  private final K key;
+  private final StateNamespace namespace;
+  private final IKvStore<ComposedKey, T> kvState;
+  private final IKvStore<ComposedKey, Object> stateInfoKvState;
+  private int elemIndex;
+
+  public JStormBagState(@Nullable K key, StateNamespace namespace, IKvStore<ComposedKey, T> kvState,
+                        IKvStore<ComposedKey, Object> stateInfoKvState) throws IOException {
+    this.key = key;
+    this.namespace = checkNotNull(namespace, "namespace");
+    this.kvState = checkNotNull(kvState, "kvState");
+    this.stateInfoKvState = checkNotNull(stateInfoKvState, "stateInfoKvState");
+
+    Integer index = (Integer) stateInfoKvState.get(getComposedKey());
+    this.elemIndex = index != null ? ++index : 0;
+  }
+
+  @Override
+  public void add(T input) {
+    try {
+      kvState.put(getComposedKey(elemIndex), input);
+      stateInfoKvState.put(getComposedKey(), elemIndex);
+      elemIndex++;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return new ReadableState<Boolean>() {
+      @Override
+      public Boolean read() {
+        return elemIndex <= 0;
+      }
+
+      @Override
+      public ReadableState<Boolean> readLater() {
+        // TODO: support prefetch.
+        return this;
+      }
+    };
+  }
+
+  @Override
+  public Iterable<T> read() {
+    return new BagStateIterable(elemIndex);
+  }
+
+  @Override
+  public BagState readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      for (int i = 0; i < elemIndex; i++) {
+        kvState.remove(getComposedKey(i));
+      }
+      stateInfoKvState.remove(getComposedKey());
+      elemIndex = 0;
+    } catch (IOException e) {
+      throw new RuntimeException(e.getCause());
+    }
+  }
+
+  private ComposedKey getComposedKey() {
+    return ComposedKey.of(key, namespace);
+  }
+
+  private ComposedKey getComposedKey(int elemIndex) {
+    return ComposedKey.of(key, namespace, elemIndex);
+  }
+
+  /**
+   * Implementation of Bag state Iterable.
+   */
+  private class BagStateIterable implements KvStoreIterable<T> {
+
+    private class BagStateIterator implements Iterator<T> {
+      private final int size;
+      private int cursor = 0;
+
+      BagStateIterator() {
+        Integer s = null;
+        try {
+          s = (Integer) stateInfoKvState.get(getComposedKey());
+        } catch (IOException e) {
+          LOG.error("Failed to get elemIndex for key={}", getComposedKey());
+        }
+        this.size = s != null ? ++s : 0;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return cursor < size;
+      }
+
+      @Override
+      public T next() {
+        if (cursor >= size) {
+          throw new NoSuchElementException();
+        }
+
+        T value = null;
+        try {
+          value = kvState.get(getComposedKey(cursor));
+        } catch (IOException e) {
+          LOG.error("Failed to read composed key-[{}]", getComposedKey(cursor));
+        }
+        cursor++;
+        return value;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    private final int size;
+
+    BagStateIterable(int size) {
+      this.size = size;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      return new BagStateIterator();
+    }
+
+    @Override
+    public String toString() {
+      return String.format("BagStateIterable: composedKey=%s", getComposedKey());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
new file mode 100644
index 0000000..6bd021f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormCombiningState.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.CombiningState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * JStorm implementation of {@link CombiningState}.
+ */
+class JStormCombiningState<InputT, AccumT, OutputT>
+    implements CombiningState<InputT, AccumT, OutputT> {
+
+  @Nullable
+  private final BagState<AccumT> accumBagState;
+  private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+
+  JStormCombiningState(
+      BagState<AccumT> accumBagState,
+      Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+    this.accumBagState = checkNotNull(accumBagState, "accumBagState");
+    this.combineFn = checkNotNull(combineFn, "combineFn");
+  }
+
+  @Override
+  public AccumT getAccum() {
+    // TODO: replacing the accumBagState with the merged accum.
+    return combineFn.mergeAccumulators(accumBagState.read());
+  }
+
+  @Override
+  public void addAccum(AccumT accumT) {
+    accumBagState.add(accumT);
+  }
+
+  @Override
+  public AccumT mergeAccumulators(Iterable<AccumT> iterable) {
+    return combineFn.mergeAccumulators(iterable);
+  }
+
+  @Override
+  public void add(InputT input) {
+    accumBagState.add(
+        combineFn.addInput(combineFn.createAccumulator(), input));
+  }
+
+  @Override
+  public ReadableState<Boolean> isEmpty() {
+    return accumBagState.isEmpty();
+  }
+
+  @Override
+  public OutputT read() {
+    return combineFn.extractOutput(
+        combineFn.mergeAccumulators(accumBagState.read()));
+  }
+
+  @Override
+  public CombiningState<InputT, AccumT, OutputT> readLater() {
+    // TODO: support prefetch.
+    return this;
+  }
+
+  @Override
+  public void clear() {
+    accumBagState.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
new file mode 100644
index 0000000..6a4e376
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormMapState.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.cache.IKvStore;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.sdk.state.MapState;
+import org.apache.beam.sdk.state.ReadableState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link MapState} in JStorm runner.
+ * @param <K>
+ * @param <V>
+ */
+class JStormMapState<K, V> implements MapState<K, V> {
+  private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
+
+  private final K key;
+  private final StateNamespace namespace;
+  private IKvStore<K, V> kvStore;
+
+  public JStormMapState(K key, StateNamespace namespace, IKvStore<K, V> kvStore) {
+    this.key = key;
+    this.namespace = namespace;
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public void put(K var1, V var2) {
+    try {
+      kvStore.put(var1, var2);
+    } catch (IOException e) {
+      reportError(String.format("Failed to put key=%s, value=%s", var1, var2), e);
+    }
+  }
+
+  @Override
+  public ReadableState<V> putIfAbsent(K var1, V var2) {
+    ReadableState<V> ret = null;
+    try {
+      V value = kvStore.get(var1);
+      if (value == null) {
+        kvStore.put(var1, var2);
+        ret = new MapReadableState<>(null);
+      } else {
+        ret = new MapReadableState<>(value);
+      }
+    } catch (IOException e) {
+      reportError(String.format("Failed to putIfAbsent key=%s, value=%s", var1, var2), e);
+    }
+    return ret;
+  }
+
+  @Override
+  public void remove(K var1) {
+    try {
+      kvStore.remove(var1);
+    } catch (IOException e) {
+      reportError(String.format("Failed to remove key=%s", var1), e);
+    }
+  }
+
+  @Override
+  public ReadableState<V> get(K var1) {
+    ReadableState<V> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState(kvStore.get(var1));
+    } catch (IOException e) {
+      reportError(String.format("Failed to get value for key=%s", var1), e);
+    }
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<K>> keys() {
+    ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.keys());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get keys"), e);
+    }
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<V>> values() {
+    ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.values());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get values"), e);
+    }
+    return ret;
+  }
+
+  @Override
+  public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
+    ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+    try {
+      ret = new MapReadableState<>(kvStore.entries());
+    } catch (IOException e) {
+      reportError(String.format("Failed to get values"), e);
+    }
+    return ret;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      Iterable<K> keys = kvStore.keys();
+      kvStore.removeBatch(keys);
+    } catch (IOException e) {
+      reportError(String.format("Failed to clear map state"), e);
+    }
+  }
+
+  private void reportError(String errorInfo, IOException e) {
+    LOG.error(errorInfo, e);
+    throw new RuntimeException(errorInfo);
+  }
+
+  private class MapReadableState<T> implements ReadableState<T> {
+    private T value;
+
+    public MapReadableState(T value) {
+      this.value = value;
+    }
+
+    @Override
+    public T read() {
+      return value;
+    }
+
+    @Override
+    public ReadableState<T> readLater() {
+      return this;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/82653534/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
index 1449a43..298ad32 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
@@ -22,8 +22,6 @@ import com.google.common.collect.Iterables;
 import java.util.List;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;