You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:13 UTC

[13/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index 9df1e17..e80fb48 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -17,13 +17,15 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
-import java.io.IOException;
-import java.util.*;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import avro.shaded.com.google.common.base.Joiner;
 import avro.shaded.com.google.common.collect.Sets;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.ITupleExt;
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.KvStoreManagerFactory;
 import com.alibaba.jstorm.cluster.Common;
@@ -31,6 +33,14 @@ import com.alibaba.jstorm.utils.KryoSerializer;
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -39,289 +49,287 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class ExecutorsBolt extends AdaptorBasicBolt {
-    private static final long serialVersionUID = -7751043327801735211L;
+  private static final long serialVersionUID = -7751043327801735211L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
 
-    protected ExecutorContext executorContext;
+  protected ExecutorContext executorContext;
 
-    protected TimerService timerService;
+  protected TimerService timerService;
 
-    // map from input tag to executor inside bolt
-    protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
-    // set of all output tags that will be emit outside bolt
-    protected final Set<TupleTag> outputTags = Sets.newHashSet();
-    protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
-    protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
-    protected int internalDoFnExecutorId = 1;
-    protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
+  // map from input tag to executor inside bolt
+  protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+  // set of all output tags that will be emit outside bolt
+  protected final Set<TupleTag> outputTags = Sets.newHashSet();
+  protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
+  protected final Set<DoFnExecutor> doFnExecutors = Sets.newHashSet();
+  protected int internalDoFnExecutorId = 1;
+  protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
 
-    protected OutputCollector collector;
+  protected OutputCollector collector;
 
-    protected boolean isStatefulBolt = false;
+  protected boolean isStatefulBolt = false;
 
-    protected KryoSerializer<WindowedValue> serializer;
+  protected KryoSerializer<WindowedValue> serializer;
 
-    public ExecutorsBolt() {
+  public ExecutorsBolt() {
 
-    }
-
-    public void setStatefulBolt(boolean isStateful) {
-        isStatefulBolt = isStateful;
-    }
-
-    public void addExecutor(TupleTag inputTag, Executor executor) {
-        inputTagToExecutor.put(
-                checkNotNull(inputTag, "inputTag"),
-                checkNotNull(executor, "executor"));
-    }
-
-    public Map<TupleTag, Executor> getExecutors() {
-        return inputTagToExecutor;
-    }
-
-    public void registerExecutor(Executor executor) {
-        if (executor instanceof DoFnExecutor) {
-            DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
-            idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
-            doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
-            internalDoFnExecutorId++;
-        }
-    }
-
-    public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
-        return idToDoFnExecutor;
-    }
+  }
 
-    public void addOutputTags(TupleTag tag) {
-        outputTags.add(tag);
-    }
+  public void setStatefulBolt(boolean isStateful) {
+    isStatefulBolt = isStateful;
+  }
 
-    public void addExternalOutputTag(TupleTag<?> tag) {
-        externalOutputTags.add(tag);
-    }
+  public void addExecutor(TupleTag inputTag, Executor executor) {
+    inputTagToExecutor.put(
+        checkNotNull(inputTag, "inputTag"),
+        checkNotNull(executor, "executor"));
+  }
 
-    public Set<TupleTag> getOutputTags() {
-        return outputTags;
-    }
+  public Map<TupleTag, Executor> getExecutors() {
+    return inputTagToExecutor;
+  }
 
-    public ExecutorContext getExecutorContext() {
-        return executorContext;
+  public void registerExecutor(Executor executor) {
+    if (executor instanceof DoFnExecutor) {
+      DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
+      idToDoFnExecutor.put(internalDoFnExecutorId, doFnExecutor);
+      doFnExecutor.setInternalDoFnExecutorId(internalDoFnExecutorId);
+      internalDoFnExecutorId++;
     }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        LOG.info("Start to prepare for task-{}", context.getThisTaskId());
-        try {
-            this.collector = collector;
-
-            // init kv store manager
-            String storeName = String.format("task-%d", context.getThisTaskId());
-            String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-            IKvStoreManager kvStoreManager = isStatefulBolt ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, stateStorePath, isStatefulBolt) :
-                    KvStoreManagerFactory.getKvStoreManager(stormConf, storeName, stateStorePath, isStatefulBolt);
-            this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
-
-            // init time service
-            timerService = initTimerService();
-
-            // init all internal executors
-            for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-                executor.init(executorContext);
-                if (executor instanceof DoFnExecutor) {
-                    doFnExecutors.add((DoFnExecutor) executor);
-                }
-            }
-
-            this.serializer = new KryoSerializer<WindowedValue>(stormConf);
-
-            LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
-            LOG.info("inputTagToExecutor={}", inputTagToExecutor);
-            LOG.info("outputTags={}", outputTags);
-            LOG.info("externalOutputTags={}", externalOutputTags);
-            LOG.info("doFnExecutors={}", doFnExecutors);
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to prepare executors bolt", e);
+  }
+
+  public Map<Integer, DoFnExecutor> getIdToDoFnExecutor() {
+    return idToDoFnExecutor;
+  }
+
+  public void addOutputTags(TupleTag tag) {
+    outputTags.add(tag);
+  }
+
+  public void addExternalOutputTag(TupleTag<?> tag) {
+    externalOutputTags.add(tag);
+  }
+
+  public Set<TupleTag> getOutputTags() {
+    return outputTags;
+  }
+
+  public ExecutorContext getExecutorContext() {
+    return executorContext;
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    LOG.info("Start to prepare for task-{}", context.getThisTaskId());
+    try {
+      this.collector = collector;
+
+      // init kv store manager
+      String storeName = String.format("task-%d", context.getThisTaskId());
+      String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+      IKvStoreManager kvStoreManager = isStatefulBolt ?
+          KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+              context, storeName, stateStorePath, isStatefulBolt) :
+          KvStoreManagerFactory.getKvStoreManager(
+              stormConf, storeName, stateStorePath, isStatefulBolt);
+      this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
+
+      // init time service
+      timerService = initTimerService();
+
+      // init all internal executors
+      for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+        executor.init(executorContext);
+        if (executor instanceof DoFnExecutor) {
+          doFnExecutors.add((DoFnExecutor) executor);
         }
-    }
+      }
 
-    public TimerService initTimerService() {
-        TopologyContext context = executorContext.getTopologyContext();
-        List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
-                .transformAndConcat(
-                        new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
-                            @Override
-                            public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
-                                if (Common.isSystemComponent(value.getKey())) {
-                                    return Collections.EMPTY_LIST;
-                                } else {
-                                    return value.getValue();
-                                }
-                            }
-                        })
-                .toList();
-        TimerService ret = new TimerServiceImpl(executorContext);
-        ret.init(tasks);
-        return ret;
-    }
+      this.serializer = new KryoSerializer<WindowedValue>(stormConf);
 
-    @Override
-    public void execute(Tuple input) {
-        // process a batch
-        String streamId = input.getSourceStreamId();
-        ITupleExt tuple = (ITupleExt) input;
-        Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
-        if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
-            while (valueIterator.hasNext()) {
-                processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
-            }
-        } else {
-            doFnStartBundle();
-            while (valueIterator.hasNext()) {
-                processElement(valueIterator.next(), streamId);
-            }
-            doFnFinishBundle();
-        }
+      LOG.info("ExecutorsBolt finished init. LocalExecutors={}", inputTagToExecutor.values());
+      LOG.info("inputTagToExecutor={}", inputTagToExecutor);
+      LOG.info("outputTags={}", outputTags);
+      LOG.info("externalOutputTags={}", externalOutputTags);
+      LOG.info("doFnExecutors={}", doFnExecutors);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to prepare executors bolt", e);
     }
-
-    private void processWatermark(long watermarkTs, int sourceTask) {
-        long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
-        LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
-                (new Instant(watermarkTs)).toDateTime(), sourceTask, (new Instant(newWaterMark)).toDateTime());
-        if (newWaterMark != 0) {
-            // Some buffer windows are going to be triggered.
-            doFnStartBundle();
-            timerService.fireTimers(newWaterMark);
-
-            // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
-            // to be received from now on. So we are going to process all push back data.
-            if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-                for (DoFnExecutor doFnExecutor : doFnExecutors) {
-                    doFnExecutor.processAllPushBackElements();
+  }
+
+  public TimerService initTimerService() {
+    TopologyContext context = executorContext.getTopologyContext();
+    List<Integer> tasks = FluentIterable.from(context.getThisSourceComponentTasks().entrySet())
+        .transformAndConcat(
+            new Function<Map.Entry<String, List<Integer>>, Iterable<Integer>>() {
+              @Override
+              public Iterable<Integer> apply(Map.Entry<String, List<Integer>> value) {
+                if (Common.isSystemComponent(value.getKey())) {
+                  return Collections.EMPTY_LIST;
+                } else {
+                  return value.getValue();
                 }
-            }
-
-            doFnFinishBundle();
-        }
-
-        long currentWaterMark = timerService.currentOutputWatermark();
-        if (!externalOutputTags.isEmpty()) {
-            collector.flush();
-            collector.emit(
-                    CommonInstance.BEAM_WATERMARK_STREAM_ID,
-                    new Values(currentWaterMark));
-            LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
-        }
-    }
-
-    private void processElement(List<Object> values, String streamId) {
-        TupleTag inputTag = new TupleTag(streamId);
-        WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
-        processExecutorElem(inputTag, windowedValue);
+              }
+            })
+        .toList();
+    TimerService ret = new TimerServiceImpl(executorContext);
+    ret.init(tasks);
+    return ret;
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    // process a batch
+    String streamId = input.getSourceStreamId();
+    ITupleExt tuple = (ITupleExt) input;
+    Iterator<List<Object>> valueIterator = tuple.batchValues().iterator();
+    if (CommonInstance.BEAM_WATERMARK_STREAM_ID.equals(streamId)) {
+      while (valueIterator.hasNext()) {
+        processWatermark((Long) valueIterator.next().get(0), input.getSourceTask());
+      }
+    } else {
+      doFnStartBundle();
+      while (valueIterator.hasNext()) {
+        processElement(valueIterator.next(), streamId);
+      }
+      doFnFinishBundle();
     }
-
-    public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
-        LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
-        if (elem != null) {
-            Executor executor = inputTagToExecutor.get(inputTag);
-            if (executor != null) {
-                executor.process(inputTag, elem);
-            }
-            if (externalOutputTags.contains(inputTag)) {
-                emitOutsideBolt(inputTag, elem);
-            }
-        } else {
-            LOG.info("Received null elem for tag={}", inputTag);
+  }
+
+  private void processWatermark(long watermarkTs, int sourceTask) {
+    long newWaterMark = timerService.updateInputWatermark(sourceTask, watermarkTs);
+    LOG.debug("Recv waterMark-{} from task-{}, newWaterMark={}",
+        (new Instant(watermarkTs)).toDateTime(),
+        sourceTask,
+        (new Instant(newWaterMark)).toDateTime());
+    if (newWaterMark != 0) {
+      // Some buffer windows are going to be triggered.
+      doFnStartBundle();
+      timerService.fireTimers(newWaterMark);
+
+      // SideInput: If receiving water mark with max timestamp, It means no more data is supposed
+      // to be received from now on. So we are going to process all push back data.
+      if (newWaterMark == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        for (DoFnExecutor doFnExecutor : doFnExecutors) {
+          doFnExecutor.processAllPushBackElements();
         }
-    }
+      }
 
-    @Override
-    public void cleanup() {
-        for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
-            executor.cleanup();
-        }
-        executorContext.getKvStoreManager().close();
+      doFnFinishBundle();
     }
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
+    long currentWaterMark = timerService.currentOutputWatermark();
+    if (!externalOutputTags.isEmpty()) {
+      collector.flush();
+      collector.emit(
+          CommonInstance.BEAM_WATERMARK_STREAM_ID,
+          new Values(currentWaterMark));
+      LOG.debug("Send waterMark-{}", (new Instant(currentWaterMark)).toDateTime());
     }
-
-    public TimerService timerService() {
-        return timerService;
+  }
+
+  private void processElement(List<Object> values, String streamId) {
+    TupleTag inputTag = new TupleTag(streamId);
+    WindowedValue windowedValue = retrieveWindowedValueFromTupleValue(values);
+    processExecutorElem(inputTag, windowedValue);
+  }
+
+  public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
+    LOG.debug("ProcessExecutorElem: inputTag={}, value={}", inputTag, elem.getValue());
+    if (elem != null) {
+      Executor executor = inputTagToExecutor.get(inputTag);
+      if (executor != null) {
+        executor.process(inputTag, elem);
+      }
+      if (externalOutputTags.contains(inputTag)) {
+        emitOutsideBolt(inputTag, elem);
+      }
+    } else {
+      LOG.info("Received null elem for tag={}", inputTag);
     }
+  }
 
-    public void setTimerService(TimerService service) {
-        timerService = service;
+  @Override
+  public void cleanup() {
+    for (Executor executor : Sets.newHashSet(inputTagToExecutor.values())) {
+      executor.cleanup();
     }
-
-    private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
-        WindowedValue wv = null;
-        if (values.size() > 1) {
-            Object key = values.get(0);
-            WindowedValue value = serializer.deserialize((byte[]) values.get(1));
-            wv = value.withValue(KV.of(key, value.getValue()));
-        } else {
-            wv = serializer.deserialize((byte[])values.get(0));
-        }
-        return wv;
+    executorContext.getKvStoreManager().close();
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return null;
+  }
+
+  public TimerService timerService() {
+    return timerService;
+  }
+
+  public void setTimerService(TimerService service) {
+    timerService = service;
+  }
+
+  private WindowedValue retrieveWindowedValueFromTupleValue(List<Object> values) {
+    WindowedValue wv = null;
+    if (values.size() > 1) {
+      Object key = values.get(0);
+      WindowedValue value = serializer.deserialize((byte[]) values.get(1));
+      wv = value.withValue(KV.of(key, value.getValue()));
+    } else {
+      wv = serializer.deserialize((byte[]) values.get(0));
     }
-
-    protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
-        LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
-        if (keyedEmit(outputTag.getId())) {
-            KV kv = (KV) outputValue.getValue();
-            byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
-            // Convert WindowedValue<KV> to <K, WindowedValue<V>>
-            if (kv.getKey() == null) {
-                // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
-                collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
-            } else {
-                collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
-            }
-        } else {
-            byte[] immutableOutputValue = serializer.serialize(outputValue);
-            collector.emit(outputTag.getId(), new Values(immutableOutputValue));
-        }
+    return wv;
+  }
+
+  protected void emitOutsideBolt(TupleTag outputTag, WindowedValue outputValue) {
+    LOG.debug("Output outside: tag={}, value={}", outputTag, outputValue.getValue());
+    if (keyedEmit(outputTag.getId())) {
+      KV kv = (KV) outputValue.getValue();
+      byte[] immutableOutputValue = serializer.serialize(outputValue.withValue(kv.getValue()));
+      // Convert WindowedValue<KV> to <K, WindowedValue<V>>
+      if (kv.getKey() == null) {
+        // If key is null, emit "null" string here. Because, null value will be ignored in JStorm.
+        collector.emit(outputTag.getId(), new Values("null", immutableOutputValue));
+      } else {
+        collector.emit(outputTag.getId(), new Values(kv.getKey(), immutableOutputValue));
+      }
+    } else {
+      byte[] immutableOutputValue = serializer.serialize(outputValue);
+      collector.emit(outputTag.getId(), new Values(immutableOutputValue));
     }
+  }
 
-    private void doFnStartBundle() {
-        for (DoFnExecutor doFnExecutor : doFnExecutors) {
-            doFnExecutor.startBundle();
-        }
+  private void doFnStartBundle() {
+    for (DoFnExecutor doFnExecutor : doFnExecutors) {
+      doFnExecutor.startBundle();
     }
+  }
 
-    private void doFnFinishBundle() {
-        for (DoFnExecutor doFnExecutor : doFnExecutors) {
-            doFnExecutor.finishBundle();
-        }
+  private void doFnFinishBundle() {
+    for (DoFnExecutor doFnExecutor : doFnExecutors) {
+      doFnExecutor.finishBundle();
     }
+  }
 
-    @Override
-    public String toString() {
-        // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
-        List<String> ret = new ArrayList<>();
+  @Override
+  public String toString() {
+    // LOG.info("bolt: " + executorContext.getTopologyContext().toJSONString());
+    List<String> ret = new ArrayList<>();
         /*ret.add("inputTags");
         for (TupleTag inputTag : inputTagToExecutor.keySet()) {
             ret.add(inputTag.getId());
         }*/
-        ret.add("internalExecutors");
-        for (Executor executor : inputTagToExecutor.values()) {
-            ret.add(executor.toString());
-        }
-        ret.add("externalOutputTags");
-        for (TupleTag output : externalOutputTags) {
-            ret.add(output.getId());
-        }
-        return Joiner.on('\n').join(ret).concat("\n");
+    ret.add("internalExecutors");
+    for (Executor executor : inputTagToExecutor.values()) {
+      ret.add(executor.toString());
+    }
+    ret.add("externalOutputTags");
+    for (TupleTag output : externalOutputTags) {
+      ret.add(output.getId());
     }
+    return Joiner.on('\n').join(ret).concat("\n");
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
index 1ef28c9..5a07243 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -17,39 +17,40 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class FlattenExecutor<InputT> implements Executor {
 
-    private final String description;
-    private TupleTag mainOutputTag;
-    private ExecutorContext context;
-    private ExecutorsBolt executorsBolt;
-
-    public FlattenExecutor(String description, TupleTag mainTupleTag) {
-        this.description = checkNotNull(description, "description");
-        this.mainOutputTag = mainTupleTag;
-    }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.context = context;
-        this.executorsBolt = context.getExecutorsBolt();
-    }
-
-    @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        executorsBolt.processExecutorElem(mainOutputTag, elem);
-    }
-
-    @Override
-    public void cleanup() {}
-
-    @Override
-    public String toString() {
-        return description;
-    }
+  private final String description;
+  private TupleTag mainOutputTag;
+  private ExecutorContext context;
+  private ExecutorsBolt executorsBolt;
+
+  public FlattenExecutor(String description, TupleTag mainTupleTag) {
+    this.description = checkNotNull(description, "description");
+    this.mainOutputTag = mainTupleTag;
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.context = context;
+    this.executorsBolt = context.getExecutorsBolt();
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    executorsBolt.processExecutorElem(mainOutputTag, elem);
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
index 299ceb2..625726d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -17,18 +17,17 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
-import java.io.Serializable;
-import java.util.List;
+import static com.google.common.base.Preconditions.checkArgument;
 
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.util.List;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
@@ -36,122 +35,138 @@ import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.TranslationContext;
+import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
-
-import org.apache.beam.runners.jstorm.translation.TranslationContext;
-import org.apache.beam.runners.jstorm.translation.TranslationContext.UserGraphContext;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class GroupByWindowExecutor<K, V> extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
-    private static final long serialVersionUID = -7563050475488610553L;
-
-    private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
-
-    private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            executorsBolt.processExecutorElem(tag, output);
-        }
-    }
-
-    private KvCoder<K, V> inputKvCoder;
-    private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
-
-    public GroupByWindowExecutor(
-            String stepName,
-            String description,
-            TranslationContext context,
-            JStormPipelineOptions pipelineOptions,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-        // The doFn will be created when runtime. Just pass "null" here
-        super(stepName, description, pipelineOptions, null, null, windowingStrategy, null, null, null, mainTupleTag, sideOutputTags);
-
-        this.outputManager = new GroupByWindowOutputManager();
-        UserGraphContext userGraphContext = context.getUserGraphContext();
-        PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
-        this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
-    }
-
-    private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
-        final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
-            @Override
-            public StateInternals stateInternalsForKey(K key) {
-                return new JStormStateInternals<K>(key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-            }
-        };
-        TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
-            @Override
-            public TimerInternals timerInternalsForKey(K key) {
-                return new JStormTimerInternals<>(key, GroupByWindowExecutor.this, executorContext.getExecutorsBolt().timerService());
-            }
-        };
-
-        reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
-        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
-              GroupAlsoByWindowViaWindowSetNewDoFn.create(
-                  windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
-                      (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
-        return doFn;
-    }
-
-    @Override
-    protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
-        doFn = getGroupByWindowDoFn();
-
-        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.<KeyedWorkItem<K, V>, KV<K, Iterable<V>>>simpleRunner(
-                this.pipelineOptions,
-                this.doFn,
-                NullSideInputReader.empty(),
-                this.outputManager,
-                this.mainTupleTag,
-                this.sideOutputTags,
-                this.stepContext,
-                this.windowingStrategy);
-
-        DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner = DoFnRunners.lateDataDroppingRunner(
-                simpleRunner,
-                this.stepContext,
-                this.windowingStrategy);
-        return new DoFnRunnerWithMetrics<>(
-            stepName, doFnRunner, MetricsReporter.create(metricClient));
-    }
-
-    @Override
-    public void process(TupleTag tag, WindowedValue elem) {
-        /**
-         *  For GroupByKey, KV type elem is received. We need to convert the KV elem
-         *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
-         */
-        KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
-        runner.processElement(elem.withValue(keyedWorkItem));
-    }
+public class GroupByWindowExecutor<K, V>
+    extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
+  private static final long serialVersionUID = -7563050475488610553L;
 
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        StateNamespace namespace = timerData.getNamespace();
-        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
 
-        runner.processElement(
-                WindowedValue.valueInGlobalWindow(
-                        KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
-    }
+  private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
 
     @Override
-    public String toString() {
-        return super.toString();
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      executorsBolt.processExecutorElem(tag, output);
     }
+  }
+
+  private KvCoder<K, V> inputKvCoder;
+  private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
+
+  public GroupByWindowExecutor(
+      String stepName,
+      String description,
+      TranslationContext context,
+      JStormPipelineOptions pipelineOptions,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<KV<K, Iterable<V>>> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+    // The doFn will be created when runtime. Just pass "null" here
+    super(
+        stepName,
+        description,
+        pipelineOptions,
+        null,
+        null,
+        windowingStrategy,
+        null,
+        null,
+        null,
+        mainTupleTag,
+        sideOutputTags);
+
+    this.outputManager = new GroupByWindowOutputManager();
+    UserGraphContext userGraphContext = context.getUserGraphContext();
+    PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
+    this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
+  }
+
+  private DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getGroupByWindowDoFn() {
+    final StateInternalsFactory<K> stateFactory = new StateInternalsFactory<K>() {
+      @Override
+      public StateInternals stateInternalsForKey(K key) {
+        return new JStormStateInternals<K>(
+            key, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+      }
+    };
+    TimerInternalsFactory<K> timerFactory = new TimerInternalsFactory<K>() {
+      @Override
+      public TimerInternals timerInternalsForKey(K key) {
+        return new JStormTimerInternals<>(
+            key,
+            GroupByWindowExecutor.this,
+            executorContext.getExecutorsBolt().timerService());
+      }
+    };
+
+    reduceFn = SystemReduceFn.buffering(inputKvCoder.getValueCoder());
+    DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFn =
+        GroupAlsoByWindowViaWindowSetNewDoFn.create(
+            windowingStrategy, stateFactory, timerFactory, NullSideInputReader.empty(),
+            (SystemReduceFn) reduceFn, outputManager, mainTupleTag);
+    return doFn;
+  }
+
+  @Override
+  protected DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> getDoFnRunner() {
+    doFn = getGroupByWindowDoFn();
+
+    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> simpleRunner = DoFnRunners.simpleRunner(
+        this.pipelineOptions,
+        this.doFn,
+        NullSideInputReader.empty(),
+        this.outputManager,
+        this.mainTupleTag,
+        this.sideOutputTags,
+        this.stepContext,
+        this.windowingStrategy);
+
+    DoFnRunner<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> doFnRunner =
+        DoFnRunners.lateDataDroppingRunner(
+            simpleRunner,
+            this.stepContext,
+            this.windowingStrategy);
+    return new DoFnRunnerWithMetrics<>(
+        stepName, doFnRunner, MetricsReporter.create(metricClient));
+  }
+
+  @Override
+  public void process(TupleTag tag, WindowedValue elem) {
+    /**
+     *  For GroupByKey, KV type elem is received. We need to convert the KV elem
+     *  into KeyedWorkItem first, which is the expected type in LateDataDroppingDoFnRunner.
+     */
+    KeyedWorkItem<K, V> keyedWorkItem = RunnerUtils.toKeyedWorkItem((WindowedValue<KV<K, V>>) elem);
+    runner.processElement(elem.withValue(keyedWorkItem));
+  }
+
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    StateNamespace namespace = timerData.getNamespace();
+    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+
+    runner.processElement(
+        WindowedValue.valueInGlobalWindow(
+            KeyedWorkItems.<K, V>timersWorkItem((K) key, ImmutableList.of(timerData))));
+  }
+
+  @Override
+  public String toString() {
+    return super.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
index cb15ea2..d36d9a6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -27,49 +30,45 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
-    private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
 
-    /**
-     * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated tag
-     * is used in downstream consumer. So before output, we need to map this "local" tag to "external"
-     * tag. See PCollectionTuple for details.
-     */
-    public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            if (localTupleTagMap.containsKey(tag)) {
-                executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
-            } else {
-                executorsBolt.processExecutorElem(tag, output);
-            }
-        }
+  /**
+   * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
+   * tag is used in downstream consumer. So before output, we need to map this "local" tag to
+   * "external" tag. See PCollectionTuple for details.
+   */
+  public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      if (localTupleTagMap.containsKey(tag)) {
+        executorsBolt.processExecutorElem((TupleTag<T>) localTupleTagMap.get(tag), output);
+      } else {
+        executorsBolt.processExecutorElem(tag, output);
+      }
     }
+  }
 
-    protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
+  protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
 
-    public MultiOutputDoFnExecutor(
-            String stepName,
-            String description,
-            JStormPipelineOptions pipelineOptions,
-            DoFn<InputT, OutputT> doFn,
-            Coder<WindowedValue<InputT>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<InputT> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs,
-            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-            TupleTag<OutputT> mainTupleTag,
-            List<TupleTag<?>> sideOutputTags,
-            Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
-            ) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
-                sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-        this.localTupleTagMap = localTupleTagMap;
-        this.outputManager = new MultiOutputDoFnExecutorOutputManager();
-        LOG.info("localTupleTagMap: {}", localTupleTagMap);
-    }
+  public MultiOutputDoFnExecutor(
+      String stepName,
+      String description,
+      JStormPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Coder<WindowedValue<InputT>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<InputT> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+      TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags,
+      Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+  ) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+    this.localTupleTagMap = localTupleTagMap;
+    this.outputManager = new MultiOutputDoFnExecutorOutputManager();
+    LOG.info("localTupleTagMap: {}", localTupleTagMap);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
index dd7921f..45ac62a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
 import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -29,40 +32,37 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
 
-    public MultiStatefulDoFnExecutor(
-        String stepName, String description,
-        JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
-        Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
-        TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
-        Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
-        List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
-    }
+  public MultiStatefulDoFnExecutor(
+      String stepName, String description,
+      JStormPipelineOptions pipelineOptions, DoFn<KV, OutputT> doFn,
+      Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
+        sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+  }
 
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        if (mainInputTag.equals(tag)) {
-            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-                    executorContext.getExecutorsBolt().timerService()));
-            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    if (mainInputTag.equals(tag)) {
+      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+          executorContext.getExecutorsBolt().timerService()));
+      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
     }
+  }
 
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        stepContext.setStateInternals(new JStormStateInternals<>(key,
-                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        super.onTimer(key, timerData);
-    }
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    stepContext.setStateInternals(new JStormStateInternals<>(key,
+        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    super.onTimer(key, timerData);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
index 7d20a4c..ba0c052 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
 import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -29,39 +32,35 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
-    public StatefulDoFnExecutor(
-            String stepName, String description, JStormPipelineOptions pipelineOptions,
-            DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
-                    sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
-        super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
-                mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
-    }
+  public StatefulDoFnExecutor(
+      String stepName, String description, JStormPipelineOptions pipelineOptions,
+      DoFn<KV, OutputT> doFn, Coder<WindowedValue<KV>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy, TupleTag<KV> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs, Map<TupleTag, PCollectionView<?>>
+          sideInputTagToView, TupleTag<OutputT> mainTupleTag, List<TupleTag<?>> sideOutputTags) {
+    super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy,
+        mainInputTag, sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
+  }
 
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        if (mainInputTag.equals(tag)) {
-            WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
-            stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
-                    executorContext.getExecutorsBolt().timerService()));
-            stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
-                    kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    if (mainInputTag.equals(tag)) {
+      WindowedValue<KV> kvElem = (WindowedValue<KV>) elem;
+      stepContext.setTimerInternals(new JStormTimerInternals(kvElem.getValue().getKey(), this,
+          executorContext.getExecutorsBolt().timerService()));
+      stepContext.setStateInternals(new JStormStateInternals<>(kvElem.getValue().getKey(),
+          kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
     }
+  }
 
-    @Override
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        stepContext.setStateInternals(new JStormStateInternals<>(key,
-                kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        super.onTimer(key, timerData);
-    }
+  @Override
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    stepContext.setStateInternals(new JStormStateInternals<>(key,
+        kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    super.onTimer(key, timerData);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
index 47db018..5c41bda 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerService.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,36 +17,35 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
-import org.apache.beam.runners.core.TimerInternals;
-import org.joda.time.Instant;
-
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.runners.core.TimerInternals;
+import org.joda.time.Instant;
 
 /**
  * Interface that tracks input watermarks and manages timers in each bolt.
  */
 public interface TimerService extends Serializable {
 
-    void init(List<Integer> upStreamTasks);
+  void init(List<Integer> upStreamTasks);
 
-    /**
-     *
-     * @param task
-     * @param inputWatermark
-     * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
-     */
-    long updateInputWatermark(Integer task, long inputWatermark);
+  /**
+   *
+   * @param task
+   * @param inputWatermark
+   * @return new watermark if any timer is triggered during the update of watermark, otherwise 0
+   */
+  long updateInputWatermark(Integer task, long inputWatermark);
 
-    long currentInputWatermark();
+  long currentInputWatermark();
 
-    long currentOutputWatermark();
+  long currentOutputWatermark();
 
-    void clearWatermarkHold(String namespace);
+  void clearWatermarkHold(String namespace);
 
-    void addWatermarkHold(String namespace, Instant watermarkHold);
+  void addWatermarkHold(String namespace, Instant watermarkHold);
 
-    void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
+  void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
 
-    void fireTimers(long newWatermark);
+  void fireTimers(long newWatermark);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
index 3b864d5..d2514f1 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,134 +17,139 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 import avro.shaded.com.google.common.collect.Maps;
 import avro.shaded.com.google.common.collect.Sets;
 import com.alibaba.jstorm.utils.Pair;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.joda.time.Instant;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Default implementation of {@link TimerService}.
  */
 public class TimerServiceImpl implements TimerService {
-    private transient ExecutorContext executorContext;
-    private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
-
-    private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark = new ConcurrentHashMap<>();
-    private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
-    private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
-    private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
-    private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue = new PriorityQueue<>();
-    private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
-            timerDataToKeyedExecutors = Maps.newHashMap();
-
-    private boolean initialized = false;
-
-    public TimerServiceImpl() {
-    }
-
-    public TimerServiceImpl(ExecutorContext executorContext) {
-        this.executorContext = executorContext;
-        this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
-    }
-
-    @Override
-    public void init(List<Integer> upStreamTasks) {
-        for (Integer task : upStreamTasks) {
-            upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-            inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-        }
-        initialized = true;
-    }
-
-    @Override
-    public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
-        checkState(initialized, "TimerService has not been initialized.");
-        Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
-        // Make sure the input watermark don't go backward.
-        if (taskInputWatermark > oldTaskInputWatermark) {
-            upStreamTaskToInputWatermark.put(task, taskInputWatermark);
-            inputWatermarks.add(taskInputWatermark);
-            inputWatermarks.remove(oldTaskInputWatermark);
-
-            long newLocalInputWatermark = currentInputWatermark();
-            if (newLocalInputWatermark > oldTaskInputWatermark) {
-                return newLocalInputWatermark;
-            }
-        }
-        return 0;
+  private transient ExecutorContext executorContext;
+  private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+
+  private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
+      new ConcurrentHashMap<>();
+  private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
+  private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
+  private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
+  private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+      new PriorityQueue<>();
+  private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
+      timerDataToKeyedExecutors = Maps.newHashMap();
+
+  private boolean initialized = false;
+
+  public TimerServiceImpl() {
+  }
+
+  public TimerServiceImpl(ExecutorContext executorContext) {
+    this.executorContext = executorContext;
+    this.idToDoFnExecutor = executorContext.getExecutorsBolt().getIdToDoFnExecutor();
+  }
+
+  @Override
+  public void init(List<Integer> upStreamTasks) {
+    for (Integer task : upStreamTasks) {
+      upStreamTaskToInputWatermark.put(task, BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+      inputWatermarks.add(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     }
-
-    @Override
-    public void fireTimers(long newWatermark) {
-        TimerInternals.TimerData timerData;
-        while ((timerData = eventTimeTimersQueue.peek()) != null
-                && timerData.getTimestamp().getMillis() <= newWatermark) {
-            for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
-                DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
-                executor.onTimer(keyedExecutor.getSecond(), timerData);
-            }
-            eventTimeTimersQueue.remove();
-            timerDataToKeyedExecutors.remove(timerData);
-        }
+    initialized = true;
+  }
+
+  @Override
+  public synchronized long updateInputWatermark(Integer task, long taskInputWatermark) {
+    checkState(initialized, "TimerService has not been initialized.");
+    Long oldTaskInputWatermark = upStreamTaskToInputWatermark.get(task);
+    // Make sure the input watermark don't go backward.
+    if (taskInputWatermark > oldTaskInputWatermark) {
+      upStreamTaskToInputWatermark.put(task, taskInputWatermark);
+      inputWatermarks.add(taskInputWatermark);
+      inputWatermarks.remove(oldTaskInputWatermark);
+
+      long newLocalInputWatermark = currentInputWatermark();
+      if (newLocalInputWatermark > oldTaskInputWatermark) {
+        return newLocalInputWatermark;
+      }
     }
-
-    @Override
-    public long currentInputWatermark() {
-        return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+    return 0;
+  }
+
+  @Override
+  public void fireTimers(long newWatermark) {
+    TimerInternals.TimerData timerData;
+    while ((timerData = eventTimeTimersQueue.peek()) != null
+        && timerData.getTimestamp().getMillis() <= newWatermark) {
+      for (Pair<Integer, Object> keyedExecutor : timerDataToKeyedExecutors.get(timerData)) {
+        DoFnExecutor executor = idToDoFnExecutor.get(keyedExecutor.getFirst());
+        executor.onTimer(keyedExecutor.getSecond(), timerData);
+      }
+      eventTimeTimersQueue.remove();
+      timerDataToKeyedExecutors.remove(timerData);
     }
-
-    @Override
-    public long currentOutputWatermark() {
-        if (watermarkHolds.isEmpty()) {
-            return currentInputWatermark();
-        } else {
-            return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
-        }
+  }
+
+  @Override
+  public long currentInputWatermark() {
+    return initialized ? inputWatermarks.peek() : BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+  }
+
+  @Override
+  public long currentOutputWatermark() {
+    if (watermarkHolds.isEmpty()) {
+      return currentInputWatermark();
+    } else {
+      return Math.min(currentInputWatermark(), watermarkHolds.peek().getMillis());
     }
-
-    @Override
-    public void clearWatermarkHold(String namespace) {
-        Instant currentHold = namespaceToWatermarkHold.get(namespace);
-        if (currentHold != null) {
-            watermarkHolds.remove(currentHold);
-            namespaceToWatermarkHold.remove(namespace);
-        }
+  }
+
+  @Override
+  public void clearWatermarkHold(String namespace) {
+    Instant currentHold = namespaceToWatermarkHold.get(namespace);
+    if (currentHold != null) {
+      watermarkHolds.remove(currentHold);
+      namespaceToWatermarkHold.remove(namespace);
     }
-
-    @Override
-    public void addWatermarkHold(String namespace, Instant watermarkHold) {
-        Instant currentHold = namespaceToWatermarkHold.get(namespace);
-        if (currentHold == null) {
-            namespaceToWatermarkHold.put(namespace, watermarkHold);
-            watermarkHolds.add(watermarkHold);
-        } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
-            namespaceToWatermarkHold.put(namespace, watermarkHold);
-            watermarkHolds.add(watermarkHold);
-            watermarkHolds.remove(currentHold);
-        }
+  }
+
+  @Override
+  public void addWatermarkHold(String namespace, Instant watermarkHold) {
+    Instant currentHold = namespaceToWatermarkHold.get(namespace);
+    if (currentHold == null) {
+      namespaceToWatermarkHold.put(namespace, watermarkHold);
+      watermarkHolds.add(watermarkHold);
+    } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+      namespaceToWatermarkHold.put(namespace, watermarkHold);
+      watermarkHolds.add(watermarkHold);
+      watermarkHolds.remove(currentHold);
     }
-
-    @Override
-    public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
-        checkArgument(
-                TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
-                String.format("Does not support domain: %s.", timerData.getDomain()));
-        Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
-        if (keyedExecutors == null) {
-            keyedExecutors = Sets.newHashSet();
-            eventTimeTimersQueue.add(timerData);
-        }
-        keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
-        timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+  }
+
+  @Override
+  public void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor) {
+    checkArgument(
+        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+        String.format("Does not support domain: %s.", timerData.getDomain()));
+    Set<Pair<Integer, Object>> keyedExecutors = timerDataToKeyedExecutors.get(timerData);
+    if (keyedExecutors == null) {
+      keyedExecutors = Sets.newHashSet();
+      eventTimeTimersQueue.add(timerData);
     }
+    keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
+    timerDataToKeyedExecutors.put(timerData, keyedExecutors);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
index 0fb88ab..2bd5f7d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -24,108 +24,107 @@ import backtype.storm.tuple.Tuple;
 import com.alibaba.jstorm.cache.IKvStore;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.transactional.bolt.ITransactionStatefulBoltExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
-    private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
-
-    private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
-    private static final String TIMER_SERVICE_KET = "timer_service_key";
-
-    private ExecutorsBolt executorsBolt;
-    private IKvStoreManager kvStoreManager;
-    private IKvStore<String, TimerService> timerServiceStore;
-
-    public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
-        this.executorsBolt = executorsBolt;
-        this.executorsBolt.setStatefulBolt(true);
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        try {
-            executorsBolt.prepare(stormConf, context, collector);
-            kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
-            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-        } catch (IOException e) {
-            LOG.error("Failed to prepare stateful bolt", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        executorsBolt.execute(input);
-    }
-
-    @Override
-    public void cleanup() {
-        executorsBolt.cleanup();
+  private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
+
+  private static final String TIME_SERVICE_STORE_ID = "timer_service_store";
+  private static final String TIMER_SERVICE_KET = "timer_service_key";
+
+  private ExecutorsBolt executorsBolt;
+  private IKvStoreManager kvStoreManager;
+  private IKvStore<String, TimerService> timerServiceStore;
+
+  public TxExecutorsBolt(ExecutorsBolt executorsBolt) {
+    this.executorsBolt = executorsBolt;
+    this.executorsBolt.setStatefulBolt(true);
+  }
+
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    try {
+      executorsBolt.prepare(stormConf, context, collector);
+      kvStoreManager = executorsBolt.getExecutorContext().getKvStoreManager();
+      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+    } catch (IOException e) {
+      LOG.error("Failed to prepare stateful bolt", e);
+      throw new RuntimeException(e.getMessage());
     }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        executorsBolt.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return executorsBolt.getComponentConfiguration();
-    }
-
-    @Override
-    public void initState(Object userState) {
-        LOG.info("Begin to init from state: {}", userState);
-        restore(userState);
-    }
-
-    @Override
-    public Object finishBatch(long batchId) {
-        try {
-            timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
-        } catch (IOException e) {
-            LOG.error("Failed to store current timer service status", e);
-            throw new RuntimeException(e.getMessage());
-        }
-        kvStoreManager.checkpoint(batchId);
-        return null;
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    executorsBolt.execute(input);
+  }
+
+  @Override
+  public void cleanup() {
+    executorsBolt.cleanup();
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    executorsBolt.declareOutputFields(declarer);
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return executorsBolt.getComponentConfiguration();
+  }
+
+  @Override
+  public void initState(Object userState) {
+    LOG.info("Begin to init from state: {}", userState);
+    restore(userState);
+  }
+
+  @Override
+  public Object finishBatch(long batchId) {
+    try {
+      timerServiceStore.put(TIMER_SERVICE_KET, executorsBolt.timerService());
+    } catch (IOException e) {
+      LOG.error("Failed to store current timer service status", e);
+      throw new RuntimeException(e.getMessage());
     }
-
-    @Override
-    public Object commit(long batchId, Object state) {
-        return kvStoreManager.backup(batchId);
-    }
-
-    @Override
-    public void rollBack(Object userState) {
-        LOG.info("Begin to rollback from state: {}", userState);
-        restore(userState);
-    }
-
-    @Override
-    public void ackCommit(long batchId, long timeStamp) {
-        kvStoreManager.remove(batchId);
-    }
-
-    private void restore(Object userState) {
-        try {
-            // restore all states
-            kvStoreManager.restore(userState);
-
-            // init timer service
-            timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
-            TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
-            if (timerService == null) {
-                timerService = executorsBolt.initTimerService();
-            }
-            executorsBolt.setTimerService(timerService);
-        } catch (IOException e) {
-            LOG.error("Failed to restore state", e);
-            throw new RuntimeException(e.getMessage());
-        }
+    kvStoreManager.checkpoint(batchId);
+    return null;
+  }
+
+  @Override
+  public Object commit(long batchId, Object state) {
+    return kvStoreManager.backup(batchId);
+  }
+
+  @Override
+  public void rollBack(Object userState) {
+    LOG.info("Begin to rollback from state: {}", userState);
+    restore(userState);
+  }
+
+  @Override
+  public void ackCommit(long batchId, long timeStamp) {
+    kvStoreManager.remove(batchId);
+  }
+
+  private void restore(Object userState) {
+    try {
+      // restore all states
+      kvStoreManager.restore(userState);
+
+      // init timer service
+      timerServiceStore = kvStoreManager.getOrCreate(TIME_SERVICE_STORE_ID);
+      TimerService timerService = timerServiceStore.get(TIMER_SERVICE_KET);
+      if (timerService == null) {
+        timerService = executorsBolt.initTimerService();
+      }
+      executorsBolt.setTimerService(timerService);
+    } catch (IOException e) {
+      LOG.error("Failed to restore state", e);
+      throw new RuntimeException(e.getMessage());
     }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
index 22dd07b..16f7d99 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -24,130 +24,130 @@ import com.alibaba.jstorm.cache.IKvStore;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.KvStoreManagerFactory;
 import com.alibaba.jstorm.transactional.spout.ITransactionSpoutExecutor;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.util.Map;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.slf4j.LoggerFactory;
 
 public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
-    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
-
-    private static final String SOURCE_STORE_ID = "SourceCheckpoint";
-    private static final String CHECKPOINT_MARK = "CheckpointMark";
-
-    private UnboundedSourceSpout sourceSpout;
-    private UnboundedSource.UnboundedReader reader;
-    private IKvStoreManager kvStoreManager;
-    private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
-
-    public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
-        this.sourceSpout = sourceSpout;
-    }
-
-    private void restore(Object userState) {
-        try {
-            kvStoreManager.restore(userState);
-            sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
-            UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
-            sourceSpout.createSourceReader(checkpointMark);
-            reader = sourceSpout.getUnboundedSourceReader();
-        } catch (IOException e) {
-            LOG.error("Failed to init state", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void initState(Object userState) {
-        restore(userState);
-    }
-
-    @Override
-    public Object finishBatch(long checkpointId) {
-        try {
-            // Store check point mark from unbounded source reader
-            UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
-            sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
-
-            // checkpoint all kv stores in current manager
-            kvStoreManager.checkpoint(checkpointId);
-        } catch (IOException e) {
-            LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
-            throw new RuntimeException(e.getMessage());
-        }
-        return null;
-    }
-
-    @Override
-    public Object commit(long batchId, Object state) {
-        // backup kv stores to remote state backend
-        return kvStoreManager.backup(batchId);
-    }
-
-    @Override
-    public void rollBack(Object userState) {
-        restore(userState);
-    }
-
-    @Override
-    public void ackCommit(long batchId, long timeStamp) {
-        // remove obsolete state in bolt local and remote state backend
-        kvStoreManager.remove(batchId);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        sourceSpout.declareOutputFields(declarer);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return sourceSpout.getComponentConfiguration();
-    }
-
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        try {
-            sourceSpout.open(conf, context, collector);
-            String storeName = String.format("task-%s", context.getThisTaskId());
-            String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-            kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(context, storeName, storePath, true);
-
-            reader = sourceSpout.getUnboundedSourceReader();
-        } catch (IOException e) {
-            LOG.error("Failed to open transactional unbounded source spout", e);
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-    @Override
-    public void close() {
-        sourceSpout.close();
-    }
-
-    @Override
-    public void activate() {
-        sourceSpout.activate();
-    }
-
-    @Override
-    public void deactivate() {
-        sourceSpout.deactivate();
-    }
-
-    @Override
-    public void nextTuple() {
-        sourceSpout.nextTuple();
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        throw new UnsupportedOperationException();
-    }
+  private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
+
+  private static final String SOURCE_STORE_ID = "SourceCheckpoint";
+  private static final String CHECKPOINT_MARK = "CheckpointMark";
+
+  private UnboundedSourceSpout sourceSpout;
+  private UnboundedSource.UnboundedReader reader;
+  private IKvStoreManager kvStoreManager;
+  private IKvStore<String, UnboundedSource.CheckpointMark> sourceCheckpointStore;
+
+  public TxUnboundedSourceSpout(UnboundedSourceSpout sourceSpout) {
+    this.sourceSpout = sourceSpout;
+  }
+
+  private void restore(Object userState) {
+    try {
+      kvStoreManager.restore(userState);
+      sourceCheckpointStore = kvStoreManager.getOrCreate(SOURCE_STORE_ID);
+      UnboundedSource.CheckpointMark checkpointMark = sourceCheckpointStore.get(CHECKPOINT_MARK);
+      sourceSpout.createSourceReader(checkpointMark);
+      reader = sourceSpout.getUnboundedSourceReader();
+    } catch (IOException e) {
+      LOG.error("Failed to init state", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void initState(Object userState) {
+    restore(userState);
+  }
+
+  @Override
+  public Object finishBatch(long checkpointId) {
+    try {
+      // Store check point mark from unbounded source reader
+      UnboundedSource.CheckpointMark checkpointMark = reader.getCheckpointMark();
+      sourceCheckpointStore.put(CHECKPOINT_MARK, checkpointMark);
+
+      // checkpoint all kv stores in current manager
+      kvStoreManager.checkpoint(checkpointId);
+    } catch (IOException e) {
+      LOG.error(String.format("Failed to finish batch-%s", checkpointId), e);
+      throw new RuntimeException(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public Object commit(long batchId, Object state) {
+    // backup kv stores to remote state backend
+    return kvStoreManager.backup(batchId);
+  }
+
+  @Override
+  public void rollBack(Object userState) {
+    restore(userState);
+  }
+
+  @Override
+  public void ackCommit(long batchId, long timeStamp) {
+    // remove obsolete state in bolt local and remote state backend
+    kvStoreManager.remove(batchId);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    sourceSpout.declareOutputFields(declarer);
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    return sourceSpout.getComponentConfiguration();
+  }
+
+  @Override
+  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+    try {
+      sourceSpout.open(conf, context, collector);
+      String storeName = String.format("task-%s", context.getThisTaskId());
+      String storePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
+      kvStoreManager = KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+          context, storeName, storePath, true);
+
+      reader = sourceSpout.getUnboundedSourceReader();
+    } catch (IOException e) {
+      LOG.error("Failed to open transactional unbounded source spout", e);
+      throw new RuntimeException(e.getMessage());
+    }
+  }
+
+  @Override
+  public void close() {
+    sourceSpout.close();
+  }
+
+  @Override
+  public void activate() {
+    sourceSpout.activate();
+  }
+
+  @Override
+  public void deactivate() {
+    sourceSpout.deactivate();
+  }
+
+  @Override
+  public void nextTuple() {
+    sourceSpout.nextTuple();
+  }
+
+  @Override
+  public void ack(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void fail(Object msgId) {
+    throw new UnsupportedOperationException();
+  }
 }
\ No newline at end of file