You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:14 UTC

[14/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
index d907fac..6d6f1c6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
@@ -17,15 +17,17 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.util.List;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
@@ -34,144 +36,151 @@ import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-
-import java.util.List;
-
 /**
  * Pipleline translator of Storm
  */
 public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-    private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
-    private TranslationContext context;
-    private int depth = 0;
-
-    public StormPipelineTranslator(TranslationContext context) {
-        this.context = context;
-    }
-
-    public void translate(Pipeline pipeline) {
-        List<PTransformOverride> transformOverrides =
-                ImmutableList.<PTransformOverride>builder()
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
-                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
-                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
-                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
-                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                               new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class))))
-                        .build();
-        pipeline.replaceAll(transformOverrides);
-        pipeline.traverseTopologically(this);
-    }
-
-    @Override
-    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-        LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
-        this.depth++;
-
-        // check if current composite transforms need to be translated. 
-        // If not, all sub transforms will be translated in visitPrimitiveTransform.
-        PTransform<?, ?> transform = node.getTransform();
-        if (transform != null) {
-            TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-
-            if (translator != null && applyCanTranslate(transform, node, translator)) {
-                applyStreamingTransform(transform, node, translator);
-                LOG.info(genSpaces(this.depth) + "translated-" + node);
-                return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-            }
-        }
-        return CompositeBehavior.ENTER_TRANSFORM;
-    }
-
-    public void leaveCompositeTransform(TransformHierarchy.Node node) {
-        this.depth--;
-        LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+  private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+  private TranslationContext context;
+  private int depth = 0;
+
+  public StormPipelineTranslator(TranslationContext context) {
+    this.context = context;
+  }
+
+  public void translate(Pipeline pipeline) {
+    List<PTransformOverride> transformOverrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    (ViewTranslator.CombineGloballyAsSingletonView.class))))
+            .build();
+    pipeline.replaceAll(transformOverrides);
+    pipeline.traverseTopologically(this);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+    this.depth++;
+
+    // check if current composite transforms need to be translated.
+    // If not, all sub transforms will be translated in visitPrimitiveTransform.
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null) {
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+      if (translator != null && applyCanTranslate(transform, node, translator)) {
+        applyStreamingTransform(transform, node, translator);
+        LOG.info(genSpaces(this.depth) + "translated-" + node);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
     }
-
-    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-        LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
-
-        if (!node.isRootNode()) {
-            PTransform<?, ?> transform = node.getTransform();
-            TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-            if (translator == null || !applyCanTranslate(transform, node, translator)) {
-                LOG.info(node.getTransform().getClass().toString());
-                throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
-            }
-            applyStreamingTransform(transform, node, translator);
-        }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    this.depth--;
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+  }
+
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+    if (!node.isRootNode()) {
+      PTransform<?, ?> transform = node.getTransform();
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+      if (translator == null || !applyCanTranslate(transform, node, translator)) {
+        LOG.info(node.getTransform().getClass().toString());
+        throw new UnsupportedOperationException(
+            "The transform " + transform + " is currently not supported.");
+      }
+      applyStreamingTransform(transform, node, translator);
     }
-
-    public void visitValue(PValue value, TransformHierarchy.Node node) {
-        LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+  }
+
+  public void visitValue(PValue value, TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+  }
+
+  private <T extends PTransform<?, ?>> void applyStreamingTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+    typedTranslator.translateNode(typedTransform, context);
+
+    // Maintain PValue to TupleTag map for side inputs translation.
+    context.getUserGraphContext().recordOutputTaggedPValue();
+  }
+
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+    return typedTranslator.canTranslate(typedTransform, context);
+  }
+
+  /**
+   * Utility formatting method.
+   *
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
     }
-
-    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node,
-            TransformTranslator<?> translator) {
-        @SuppressWarnings("unchecked")
-        T typedTransform = (T) transform;
-        @SuppressWarnings("unchecked")
-        TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
-        context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-        typedTranslator.translateNode(typedTransform, context);
-
-        // Maintain PValue to TupleTag map for side inputs translation.
-        context.getUserGraphContext().recordOutputTaggedPValue();
+    return builder.toString();
+  }
+
+  private static class ReflectiveOneToOneOverrideFactory<
+      InputT extends PValue,
+      OutputT extends PValue,
+      TransformT extends PTransform<InputT, OutputT>>
+      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+    private final Class<PTransform<InputT, OutputT>> replacement;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<InputT, OutputT>> replacement) {
+      this.replacement = replacement;
     }
 
-    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) {
-        @SuppressWarnings("unchecked")
-        T typedTransform = (T) transform;
-        @SuppressWarnings("unchecked")
-        TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
-        context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-
-        return typedTranslator.canTranslate(typedTransform, context);
-    }
-
-    /**
-     * Utility formatting method.
-     * 
-     * @param n number of spaces to generate
-     * @return String with "|" followed by n spaces
-     */
-    protected static String genSpaces(int n) {
-        StringBuilder builder = new StringBuilder();
-        for (int i = 0; i < n; i++) {
-            builder.append("|   ");
-        }
-        return builder.toString();
-    }
-
-    private static class ReflectiveOneToOneOverrideFactory<
-            InputT extends PValue,
-            OutputT extends PValue,
-            TransformT extends PTransform<InputT, OutputT>>
-            extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
-        private final Class<PTransform<InputT, OutputT>> replacement;
-
-        private ReflectiveOneToOneOverrideFactory(
-                Class<PTransform<InputT, OutputT>> replacement) {
-            this.replacement = replacement;
-        }
-
-        @Override
-        public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
-            PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
-            PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
-                    .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform)
-                    .build();
-            InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
-            return PTransformReplacement.of(inputT, replacedPTransform);
-        }
+    @Override
+    public PTransformReplacement<InputT, OutputT> getReplacementTransform(
+        AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+      PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+      PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+          .withArg(
+              (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
+              originalPTransform)
+          .build();
+      InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+      return PTransformReplacement.of(inputT, replacedPTransform);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 707202b..526352a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -17,19 +17,29 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
 import avro.shaded.com.google.common.collect.Lists;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.translator.Stream;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
-import com.google.common.base.Strings;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
 import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
 import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValueBase;
@@ -38,387 +48,392 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
-
-import java.util.*;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * Maintains the state necessary during Pipeline translation to build a Storm topology.
  */
 public class TranslationContext {
-    private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-
-    private final UserGraphContext userGraphContext;
-    private final ExecutionGraphContext executionGraphContext;
-
-    public TranslationContext(JStormPipelineOptions options) {
-        this.userGraphContext = new UserGraphContext(options);
-        this.executionGraphContext = new ExecutionGraphContext();
+  private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+  private final UserGraphContext userGraphContext;
+  private final ExecutionGraphContext executionGraphContext;
+
+  public TranslationContext(JStormPipelineOptions options) {
+    this.userGraphContext = new UserGraphContext(options);
+    this.executionGraphContext = new ExecutionGraphContext();
+  }
+
+  public ExecutionGraphContext getExecutionGraphContext() {
+    return executionGraphContext;
+  }
+
+  public UserGraphContext getUserGraphContext() {
+    return userGraphContext;
+  }
+
+  private void addStormStreamDef(
+      TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
+    Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
+    if (!producer.getComponentId().equals(destComponentName)) {
+      Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
+      executionGraphContext.registerStreamConsumer(consumer, producer);
+
+      ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
+      if (executorsBolt != null) {
+        executorsBolt.addExternalOutputTag(input.getTag());
+      }
     }
-
-    public ExecutionGraphContext getExecutionGraphContext() {
-        return executionGraphContext;
+  }
+
+  private String getUpstreamExecutorsBolt() {
+    for (PValue value : userGraphContext.getInputs().values()) {
+      String componentId = executionGraphContext.getProducerComponentId(value);
+      if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
+        return componentId;
+      }
     }
-
-    public UserGraphContext getUserGraphContext() {
-        return userGraphContext;
+    // When upstream component is spout, "null" will be return.
+    return null;
+  }
+
+  /**
+   * check if the current transform is applied to source collection.
+   *
+   * @return
+   */
+  private boolean connectedToSource() {
+    for (PValue value : userGraphContext.getInputs().values()) {
+      if (executionGraphContext.producedBySpout(value)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @param upstreamExecutorsBolt
+   * @return true if there is multiple input streams, or upstream executor output the same stream
+   * to different executors
+   */
+  private boolean isMultipleInputOrOutput(
+      ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
+    if (inputs.size() > 1) {
+      return true;
+    } else {
+      final Sets.SetView<TupleTag> intersection =
+          Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
+      if (!intersection.isEmpty()) {
+        // there is already a different executor consume the same input
+        return true;
+      } else {
+        return false;
+      }
     }
+  }
 
-    private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
-        Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
-        if (!producer.getComponentId().equals(destComponentName)) {
-            Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
-            executionGraphContext.registerStreamConsumer(consumer, producer);
+  public void addTransformExecutor(Executor executor) {
+    addTransformExecutor(executor, Collections.EMPTY_LIST);
+  }
 
-            ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
-            if (executorsBolt != null) {
-                executorsBolt.addExternalOutputTag(input.getTag());
-            }
-        }
-    }
+  public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
+    addTransformExecutor(
+        executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
+  }
 
-    private String getUpstreamExecutorsBolt() {
-        for (PValue value : userGraphContext.getInputs().values()) {
-            String componentId = executionGraphContext.getProducerComponentId(value);
-            if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
-                return componentId;
-            }
-        }
-        // When upstream component is spout, "null" will be return.
-        return null;
-    }
+  public void addTransformExecutor(
+      Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
+    addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
+  }
 
-    /**
-     * check if the current transform is applied to source collection.
-     * @return
-     */
-    private boolean connectedToSource() {
-        for (PValue value : userGraphContext.getInputs().values()) {
-            if (executionGraphContext.producedBySpout(value)) {
-                return true;
-            }
-        }
-        return false;
-    }
+  public void addTransformExecutor(
+      Executor executor,
+      Map<TupleTag<?>, PValue> inputs,
+      Map<TupleTag<?>, PValue> outputs,
+      List<PValue> sideInputs) {
+    String name = null;
 
+    ExecutorsBolt bolt = null;
+
+    boolean isGBK = false;
     /**
-     * @param upstreamExecutorsBolt
-     * @return true if there is multiple input streams, or upstream executor output the same stream
-     *          to different executors
+     * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
+     * For following cases, a new bolt is created for the specified executor, otherwise the executor
+     * will be added into the bolt contains corresponding upstream executor.
+     * a) it is a GroupByKey executor
+     * b) it is connected to source directly
+     * c) None existing upstream bolt was found
+     * d) For the purpose of performance to reduce the side effects between multiple streams which
+     *    is output to same executor, a new bolt will be created.
      */
-    private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
-        if (inputs.size() > 1) {
-            return true;
-        } else {
-            final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
-            if (!intersection.isEmpty()) {
-                // there is already a different executor consume the same input
-                return true;
-            } else {
-                return false;
-            }
-        }
+    if (RunnerUtils.isGroupByKeyExecutor(executor)) {
+      bolt = new ExecutorsBolt();
+      name = executionGraphContext.registerBolt(bolt);
+      isGBK = true;
+    } else if (connectedToSource()) {
+      bolt = new ExecutorsBolt();
+      name = executionGraphContext.registerBolt(bolt);
+    } else {
+      name = getUpstreamExecutorsBolt();
+      if (name == null) {
+        bolt = new ExecutorsBolt();
+        name = executionGraphContext.registerBolt(bolt);
+      } else {
+        bolt = executionGraphContext.getBolt(name);
+        if (isMultipleInputOrOutput(bolt, inputs)) {
+          bolt = new ExecutorsBolt();
+          name = executionGraphContext.registerBolt(bolt);
+        }
+      }
     }
 
-    public void addTransformExecutor(Executor executor) {
-        addTransformExecutor(executor, Collections.EMPTY_LIST);
-    }
-
-    public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
-        addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
-    }
-
-    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
-        addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
-    }
-
-    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
-        String name = null;
-
-        ExecutorsBolt bolt = null;
-
-        boolean isGBK = false;
-        /**
-         * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
-         * For following cases, a new bolt is created for the specified executor, otherwise the executor
-         * will be added into the bolt contains corresponding upstream executor.
-         * a) it is a GroupByKey executor
-         * b) it is connected to source directly
-         * c) None existing upstream bolt was found
-         * d) For the purpose of performance to reduce the side effects between multiple streams which
-         *    is output to same executor, a new bolt will be created.
-         */
-        if (RunnerUtils.isGroupByKeyExecutor(executor)) {
-            bolt = new ExecutorsBolt();
-            name = executionGraphContext.registerBolt(bolt);
-            isGBK = true;
-        } else if (connectedToSource()) {
-            bolt = new ExecutorsBolt();
-            name = executionGraphContext.registerBolt(bolt);
-        } else {
-            name = getUpstreamExecutorsBolt();
-            if (name == null) {
-                bolt = new ExecutorsBolt();
-                name = executionGraphContext.registerBolt(bolt);
-            } else {
-                bolt = executionGraphContext.getBolt(name);
-                if (isMultipleInputOrOutput(bolt, inputs)) {
-                    bolt = new ExecutorsBolt();
-                    name = executionGraphContext.registerBolt(bolt);
-                }
-            }
-        }
-
-        // update the output tags of current transform into ExecutorsBolt
-        for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
-            TupleTag tag = entry.getKey();
-            PValue value = entry.getValue();
-
-            // use tag of PValueBase
-            if (value instanceof PValueBase) {
-                tag = ((PValueBase) value).expand().keySet().iterator().next();
-            }
-            executionGraphContext.registerStreamProducer(
-                    TaggedPValue.of(tag, value),
-                    Stream.Producer.of(name, tag.getId(), value.getName()));
-            //bolt.addOutputTags(tag);
-        }
+    // update the output tags of current transform into ExecutorsBolt
+    for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+      TupleTag tag = entry.getKey();
+      PValue value = entry.getValue();
+
+      // use tag of PValueBase
+      if (value instanceof PValueBase) {
+        tag = ((PValueBase) value).expand().keySet().iterator().next();
+      }
+      executionGraphContext.registerStreamProducer(
+          TaggedPValue.of(tag, value),
+          Stream.Producer.of(name, tag.getId(), value.getName()));
+      //bolt.addOutputTags(tag);
+    }
 
-        // add the transform executor into the chain of ExecutorsBolt
-        for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
-            TupleTag tag = entry.getKey();
-            PValue value = entry.getValue();
-            bolt.addExecutor(tag, executor);
-
-            // filter all connections inside bolt
-            //if (!bolt.getOutputTags().contains(tag)) {
-                Stream.Grouping grouping;
-                if (isGBK) {
-                    grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
-                } else {
-                    grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
-                }
-                addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
-            //}
-        }
+    // add the transform executor into the chain of ExecutorsBolt
+    for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
+      TupleTag tag = entry.getKey();
+      PValue value = entry.getValue();
+      bolt.addExecutor(tag, executor);
+
+      // filter all connections inside bolt
+      //if (!bolt.getOutputTags().contains(tag)) {
+      Stream.Grouping grouping;
+      if (isGBK) {
+        grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
+      } else {
+        grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
+      }
+      addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
+      //}
+    }
 
-        for (PValue sideInput : sideInputs) {
-            TupleTag tag = userGraphContext.findTupleTag(sideInput);
-            bolt.addExecutor(tag, executor);
-            checkState(!bolt.getOutputTags().contains(tag));
-            addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
-        }
+    for (PValue sideInput : sideInputs) {
+      TupleTag tag = userGraphContext.findTupleTag(sideInput);
+      bolt.addExecutor(tag, executor);
+      checkState(!bolt.getOutputTags().contains(tag));
+      addStormStreamDef(
+          TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
+    }
 
-        bolt.registerExecutor(executor);
+    bolt.registerExecutor(executor);
 
-        // set parallelismNumber
-        String pTransformfullName = userGraphContext.currentTransform.getFullName();
-        String compositeName = pTransformfullName.split("/")[0];
-        Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
-        if (parallelismNumMap.containsKey(compositeName)) {
-            int configNum = (Integer) parallelismNumMap.get(compositeName);
-            int currNum = bolt.getParallelismNum();
-            bolt.setParallelismNum(Math.max(configNum, currNum));
-        }
+    // set parallelismNumber
+    String pTransformfullName = userGraphContext.currentTransform.getFullName();
+    String compositeName = pTransformfullName.split("/")[0];
+    Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+    if (parallelismNumMap.containsKey(compositeName)) {
+      int configNum = (Integer) parallelismNumMap.get(compositeName);
+      int currNum = bolt.getParallelismNum();
+      bolt.setParallelismNum(Math.max(configNum, currNum));
     }
+  }
 
-    // TODO: add getSideInputs() and getSideOutputs().
-    public static class UserGraphContext {
-        private final JStormPipelineOptions options;
-        private final Map<PValue, TupleTag> pValueToTupleTag;
-        private AppliedPTransform<?, ?, ?> currentTransform = null;
+  // TODO: add getSideInputs() and getSideOutputs().
+  public static class UserGraphContext {
+    private final JStormPipelineOptions options;
+    private final Map<PValue, TupleTag> pValueToTupleTag;
+    private AppliedPTransform<?, ?, ?> currentTransform = null;
 
-        private boolean isWindowed = false;
+    private boolean isWindowed = false;
 
-        public UserGraphContext(JStormPipelineOptions options) {
-            this.options = checkNotNull(options, "options");
-            this.pValueToTupleTag = Maps.newHashMap();
-        }
+    public UserGraphContext(JStormPipelineOptions options) {
+      this.options = checkNotNull(options, "options");
+      this.pValueToTupleTag = Maps.newHashMap();
+    }
 
-        public JStormPipelineOptions getOptions() {
-            return this.options;
-        }
+    public JStormPipelineOptions getOptions() {
+      return this.options;
+    }
 
-        public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
-            this.currentTransform = transform;
-        }
+    public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+      this.currentTransform = transform;
+    }
 
-        public String getStepName() {
-            return currentTransform.getFullName();
-        }
+    public String getStepName() {
+      return currentTransform.getFullName();
+    }
 
-        public <T extends PValue> T getInput() {
-            return (T) currentTransform.getInputs().values().iterator().next();
-        }
+    public <T extends PValue> T getInput() {
+      return (T) currentTransform.getInputs().values().iterator().next();
+    }
 
-        public Map<TupleTag<?>, PValue> getInputs() {
-            return currentTransform.getInputs();
-        }
+    public Map<TupleTag<?>, PValue> getInputs() {
+      return currentTransform.getInputs();
+    }
 
-        public TupleTag<?> getInputTag() {
-            return currentTransform.getInputs().keySet().iterator().next();
-        }
+    public TupleTag<?> getInputTag() {
+      return currentTransform.getInputs().keySet().iterator().next();
+    }
 
-        public List<TupleTag<?>> getInputTags() {
-            return Lists.newArrayList(currentTransform.getInputs().keySet());
-        }
+    public List<TupleTag<?>> getInputTags() {
+      return Lists.newArrayList(currentTransform.getInputs().keySet());
+    }
 
-        public <T extends PValue> T getOutput() {
-            return (T) currentTransform.getOutputs().values().iterator().next();
-        }
+    public <T extends PValue> T getOutput() {
+      return (T) currentTransform.getOutputs().values().iterator().next();
+    }
 
-        public Map<TupleTag<?>, PValue> getOutputs() {
-            return currentTransform.getOutputs();
-        }
+    public Map<TupleTag<?>, PValue> getOutputs() {
+      return currentTransform.getOutputs();
+    }
 
-        public TupleTag<?> getOutputTag() {
-            return currentTransform.getOutputs().keySet().iterator().next();
-        }
+    public TupleTag<?> getOutputTag() {
+      return currentTransform.getOutputs().keySet().iterator().next();
+    }
 
-        public List<TupleTag<?>> getOutputTags() {
-            return Lists.newArrayList(currentTransform.getOutputs().keySet());
-        }
+    public List<TupleTag<?>> getOutputTags() {
+      return Lists.newArrayList(currentTransform.getOutputs().keySet());
+    }
 
-        public void recordOutputTaggedPValue() {
-            for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
-                pValueToTupleTag.put(entry.getValue(), entry.getKey());
-            }
-        }
+    public void recordOutputTaggedPValue() {
+      for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
+        pValueToTupleTag.put(entry.getValue(), entry.getKey());
+      }
+    }
 
-        public <T> TupleTag<T> findTupleTag(PValue pValue) {
-            return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
-        }
+    public <T> TupleTag<T> findTupleTag(PValue pValue) {
+      return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
+    }
 
-        public void setWindowed() {
-            this.isWindowed = true;
-        }
+    public void setWindowed() {
+      this.isWindowed = true;
+    }
 
-        public boolean isWindowed() {
-            return this.isWindowed;
-        }
+    public boolean isWindowed() {
+      return this.isWindowed;
+    }
 
-        @Override
-        public String toString() {
-            return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
-                    .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
-                        @Override
-                        public String apply(Map.Entry<PValue, TupleTag> entry) {
-                            return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
-                        }}));
-        }
+    @Override
+    public String toString() {
+      return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
+          .transform(new Function<Map.Entry<PValue, TupleTag>, String>() {
+            @Override
+            public String apply(Map.Entry<PValue, TupleTag> entry) {
+              return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
+            }
+          }));
     }
+  }
 
-    public static class ExecutionGraphContext {
+  public static class ExecutionGraphContext {
 
-        private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
-        private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
+    private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+    private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
 
-        // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
-        private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
-        private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
+    // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
+    private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
+    private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
 
-        private final List<Stream> streams = new ArrayList<>();
+    private final List<Stream> streams = new ArrayList<>();
 
-        private int id = 1;
+    private int id = 1;
 
-        public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
-            checkNotNull(spout, "spout");
-            checkNotNull(output, "output");
-            String name = "spout" + genId();
-            this.spoutMap.put(name, spout);
-            registerStreamProducer(
-                    output,
-                    Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
-        }
+    public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+      checkNotNull(spout, "spout");
+      checkNotNull(output, "output");
+      String name = "spout" + genId();
+      this.spoutMap.put(name, spout);
+      registerStreamProducer(
+          output,
+          Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
+    }
 
-        public AdaptorBasicSpout getSpout(String id) {
-            if (Strings.isNullOrEmpty(id)) {
-                return null;
-            }
-            return this.spoutMap.get(id);
-        }
+    public AdaptorBasicSpout getSpout(String id) {
+      if (Strings.isNullOrEmpty(id)) {
+        return null;
+      }
+      return this.spoutMap.get(id);
+    }
 
-        public Map<String, AdaptorBasicSpout> getSpouts() {
-            return this.spoutMap;
-        }
+    public Map<String, AdaptorBasicSpout> getSpouts() {
+      return this.spoutMap;
+    }
 
-        public String registerBolt(ExecutorsBolt bolt) {
-            checkNotNull(bolt, "bolt");
-            String name = "bolt" + genId();
-            this.boltMap.put(name, bolt);
-            return name;
-        }
+    public String registerBolt(ExecutorsBolt bolt) {
+      checkNotNull(bolt, "bolt");
+      String name = "bolt" + genId();
+      this.boltMap.put(name, bolt);
+      return name;
+    }
 
-        public ExecutorsBolt getBolt(String id) {
-            if (Strings.isNullOrEmpty(id)) {
-                return null;
-            }
-            return this.boltMap.get(id);
-        }
+    public ExecutorsBolt getBolt(String id) {
+      if (Strings.isNullOrEmpty(id)) {
+        return null;
+      }
+      return this.boltMap.get(id);
+    }
 
-        public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
-            checkNotNull(taggedPValue, "taggedPValue");
-            checkNotNull(producer, "producer");
-            pValueToProducer.put(taggedPValue.getValue(), producer);
-            producerToTaggedPValue.put(producer, taggedPValue);
-        }
+    public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
+      checkNotNull(taggedPValue, "taggedPValue");
+      checkNotNull(producer, "producer");
+      pValueToProducer.put(taggedPValue.getValue(), producer);
+      producerToTaggedPValue.put(producer, taggedPValue);
+    }
 
-        public Stream.Producer getProducer(PValue pValue) {
-            return pValueToProducer.get(checkNotNull(pValue, "pValue"));
-        }
+    public Stream.Producer getProducer(PValue pValue) {
+      return pValueToProducer.get(checkNotNull(pValue, "pValue"));
+    }
 
-        public String getProducerComponentId(PValue pValue) {
-            Stream.Producer producer = getProducer(pValue);
-            return producer == null ? null : producer.getComponentId();
-        }
+    public String getProducerComponentId(PValue pValue) {
+      Stream.Producer producer = getProducer(pValue);
+      return producer == null ? null : producer.getComponentId();
+    }
 
-        public boolean producedBySpout(PValue pValue) {
-            String componentId = getProducerComponentId(pValue);
-            return getSpout(componentId) != null;
-        }
+    public boolean producedBySpout(PValue pValue) {
+      String componentId = getProducerComponentId(pValue);
+      return getSpout(componentId) != null;
+    }
 
-        public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
-            streams.add(Stream.of(
-                    checkNotNull(producer, "producer"),
-                    checkNotNull(consumer, "consumer")));
-        }
+    public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
+      streams.add(Stream.of(
+          checkNotNull(producer, "producer"),
+          checkNotNull(consumer, "consumer")));
+    }
 
-        public Map<PValue, Stream.Producer> getPValueToProducers() {
-            return pValueToProducer;
-        }
+    public Map<PValue, Stream.Producer> getPValueToProducers() {
+      return pValueToProducer;
+    }
 
-        public Iterable<Stream> getStreams() {
-            return streams;
-        }
+    public Iterable<Stream> getStreams() {
+      return streams;
+    }
 
-        @Override
-        public String toString() {
-            List<String> ret = new ArrayList<>();
-            ret.add("SPOUT");
-            for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
-                ret.add(entry.getKey() + ": " + entry.getValue().toString());
-            }
-            ret.add("BOLT");
-            for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
-                ret.add(entry.getKey() + ": " + entry.getValue().toString());
-            }
-            ret.add("STREAM");
-            for (Stream stream : streams) {
-                ret.add(String.format(
-                        "%s@@%s ---> %s@@%s",
-                        stream.getProducer().getStreamId(),
-                        stream.getProducer().getComponentId(),
-                        stream.getConsumer().getGrouping(),
-                        stream.getConsumer().getComponentId()));
-            }
-            return Joiner.on("\n").join(ret);
-        }
+    @Override
+    public String toString() {
+      List<String> ret = new ArrayList<>();
+      ret.add("SPOUT");
+      for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+        ret.add(entry.getKey() + ": " + entry.getValue().toString());
+      }
+      ret.add("BOLT");
+      for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
+        ret.add(entry.getKey() + ": " + entry.getValue().toString());
+      }
+      ret.add("STREAM");
+      for (Stream stream : streams) {
+        ret.add(String.format(
+            "%s@@%s ---> %s@@%s",
+            stream.getProducer().getStreamId(),
+            stream.getProducer().getComponentId(),
+            stream.getConsumer().getGrouping(),
+            stream.getConsumer().getComponentId()));
+      }
+      return Joiner.on("\n").join(ret);
+    }
 
-        private synchronized int genId() {
-            return id++;
-        }
+    private synchronized int genId() {
+      return id++;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index a33f07b..bce5b3e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.jstorm.translation;
 
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
 import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
 import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
@@ -35,50 +37,49 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 /**
  * Lookup table mapping PTransform types to associated TransformTranslator implementations.
  */
 public class TranslatorRegistry {
-    private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
 
-    private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
+  private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS =
+      new HashMap<>();
 
-    static {
-        TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
-        TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
-        // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
-        // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+  static {
+    TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
+    TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+    // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+    // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
-        TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
-        TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
+    TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
+    TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
 
-        //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
-        TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+    //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
 
-        TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+    TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
 
-        TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
 
-        TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
+    TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
 
-        /**
-         * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be 
-         * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
-         * If any improvement is required, the composite transforms will be translated in the future.
-         */
-        // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
-        // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
-        // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
-    }
+    /**
+     * Currently, empty translation is required for combine and reshuffle.
+     * Because, the transforms will be mapped to GroupByKey and Pardo finally.
+     * So we only need to translator the finally transforms.
+     * If any improvement is required, the composite transforms will be translated in the future.
+     */
+    // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+    // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
+    // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
+  }
 
-    public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
-        TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
-        if (translator == null) {
-            LOG.warn("Unsupported operator={}", transform.getClass().getName());
-        }
-        return translator;
+  public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+    TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+    if (translator == null) {
+      LOG.warn("Unsupported operator={}", transform.getClass().getName());
     }
+    return translator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
index b07b426..68e9e17 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -17,54 +17,52 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
-
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
 
 /*
  * Enable user to add output stream definitions by API, rather than hard-code.
  */
 public abstract class AbstractComponent implements IComponent {
-    private Map<String, Fields> streamToFields = new HashMap<>();
-    private Map<String, Boolean> keyStreams = new HashMap<>();
-    private int parallelismNum = 0;
+  private Map<String, Fields> streamToFields = new HashMap<>();
+  private Map<String, Boolean> keyStreams = new HashMap<>();
+  private int parallelismNum = 0;
 
-    public void addOutputField(String streamId) {
-        addOutputField(streamId, new Fields(CommonInstance.VALUE));
-    }
+  public void addOutputField(String streamId) {
+    addOutputField(streamId, new Fields(CommonInstance.VALUE));
+  }
 
-    public void addOutputField(String streamId, Fields fields) {
-        streamToFields.put(streamId, fields);
-        keyStreams.put(streamId, false);
-    }
+  public void addOutputField(String streamId, Fields fields) {
+    streamToFields.put(streamId, fields);
+    keyStreams.put(streamId, false);
+  }
 
-    public void addKVOutputField(String streamId) {
-        streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
-        keyStreams.put(streamId, true);
-    }
+  public void addKVOutputField(String streamId) {
+    streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+    keyStreams.put(streamId, true);
+  }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
-            declarer.declareStream(entry.getKey(), entry.getValue());
-        }
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+      declarer.declareStream(entry.getKey(), entry.getValue());
     }
+  }
 
-    public boolean keyedEmit(String streamId) {
-        Boolean isKeyedStream = keyStreams.get(streamId);
-        return isKeyedStream == null ? false : isKeyedStream;
-    }
+  public boolean keyedEmit(String streamId) {
+    Boolean isKeyedStream = keyStreams.get(streamId);
+    return isKeyedStream == null ? false : isKeyedStream;
+  }
 
-    public int getParallelismNum() {
-        return parallelismNum;
-    }
+  public int getParallelismNum() {
+    return parallelismNum;
+  }
 
-    public void setParallelismNum(int num) {
-        parallelismNum = num;
-    }
+  public void setParallelismNum(int num) {
+    parallelismNum = num;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
index 91881f2..5e9b056 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 import backtype.storm.topology.IRichBatchBolt;
 
 public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
-    
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
index 5a0c6ec..0480518 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -20,5 +20,5 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 import backtype.storm.topology.IRichSpout;
 
 public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
-    
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
index c73a3b8..9507948 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -17,312 +17,319 @@
  */
 package org.apache.beam.runners.jstorm.translation.runtime;
 
-import java.io.Serializable;
-import java.util.*;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import avro.shaded.com.google.common.collect.Iterables;
-import org.apache.beam.runners.jstorm.JStormPipelineOptions;
-import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
-import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
-
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.metric.MetricClient;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.jstorm.JStormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.state.BagState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
-import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class DoFnExecutor<InputT, OutputT> implements Executor {
-    private static final long serialVersionUID = 5297603063991078668L;
+  private static final long serialVersionUID = 5297603063991078668L;
 
-    private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
 
-    public class DoFnExecutorOutputManager implements OutputManager, Serializable {
-        private static final long serialVersionUID = -661113364735206170L;
+  public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+    private static final long serialVersionUID = -661113364735206170L;
 
-        @Override
-        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-            executorsBolt.processExecutorElem(tag, output);
-        }
-    }
-
-    protected transient DoFnRunner<InputT, OutputT> runner = null;
-    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
-
-    protected final String stepName;
-
-    protected int internalDoFnExecutorId;
-
-    protected final String description;
-
-    protected final TupleTag<OutputT> mainTupleTag;
-    protected final List<TupleTag<?>> sideOutputTags;
-
-    protected SerializedPipelineOptions serializedOptions;
-    protected transient JStormPipelineOptions pipelineOptions;
-
-    protected DoFn<InputT, OutputT> doFn;
-    protected final Coder<WindowedValue<InputT>> inputCoder;
-    protected DoFnInvoker<InputT, OutputT> doFnInvoker;
-    protected OutputManager outputManager;
-    protected WindowingStrategy<?, ?> windowingStrategy;
-    protected final TupleTag<InputT> mainInputTag;
-    protected Collection<PCollectionView<?>> sideInputs;
-    protected SideInputHandler sideInputHandler;
-    protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
-
-    // Initialize during runtime
-    protected ExecutorContext executorContext;
-    protected ExecutorsBolt executorsBolt;
-    protected TimerInternals timerInternals;
-    protected transient StateInternals pushbackStateInternals;
-    protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
-    protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
-    protected transient IKvStoreManager kvStoreManager;
-    protected DefaultStepContext stepContext;
-    protected transient MetricClient metricClient;
-
-    public DoFnExecutor(
-            String stepName,
-            String description,
-            JStormPipelineOptions pipelineOptions,
-            DoFn<InputT, OutputT> doFn,
-            Coder<WindowedValue<InputT>> inputCoder,
-            WindowingStrategy<?, ?> windowingStrategy,
-            TupleTag<InputT> mainInputTag,
-            Collection<PCollectionView<?>> sideInputs,
-            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
-            TupleTag<OutputT> mainTupleTag,
-            List<TupleTag<?>> sideOutputTags) {
-        this.stepName = checkNotNull(stepName, "stepName");
-        this.description = checkNotNull(description, "description");
-        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-        this.doFn = doFn;
-        this.inputCoder = inputCoder;
-        this.outputManager = new DoFnExecutorOutputManager();
-        this.windowingStrategy = windowingStrategy;
-        this.mainInputTag = mainInputTag;
-        this.sideInputs = sideInputs;
-        this.mainTupleTag = mainTupleTag;
-        this.sideOutputTags = sideOutputTags;
-        this.sideInputTagToView = sideInputTagToView;
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      executorsBolt.processExecutorElem(tag, output);
     }
-
-    protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
-        return new DoFnRunnerWithMetrics<>(
-            stepName,
-            DoFnRunners.simpleRunner(
-                this.pipelineOptions,
-                this.doFn,
-                this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
-                this.outputManager,
-                this.mainTupleTag,
-                this.sideOutputTags,
-                this.stepContext,
-                this.windowingStrategy),
-            MetricsReporter.create(metricClient));
+  }
+
+  protected transient DoFnRunner<InputT, OutputT> runner = null;
+  protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+  protected final String stepName;
+
+  protected int internalDoFnExecutorId;
+
+  protected final String description;
+
+  protected final TupleTag<OutputT> mainTupleTag;
+  protected final List<TupleTag<?>> sideOutputTags;
+
+  protected SerializedPipelineOptions serializedOptions;
+  protected transient JStormPipelineOptions pipelineOptions;
+
+  protected DoFn<InputT, OutputT> doFn;
+  protected final Coder<WindowedValue<InputT>> inputCoder;
+  protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+  protected OutputManager outputManager;
+  protected WindowingStrategy<?, ?> windowingStrategy;
+  protected final TupleTag<InputT> mainInputTag;
+  protected Collection<PCollectionView<?>> sideInputs;
+  protected SideInputHandler sideInputHandler;
+  protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+  // Initialize during runtime
+  protected ExecutorContext executorContext;
+  protected ExecutorsBolt executorsBolt;
+  protected TimerInternals timerInternals;
+  protected transient StateInternals pushbackStateInternals;
+  protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+  protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+  protected transient IKvStoreManager kvStoreManager;
+  protected DefaultStepContext stepContext;
+  protected transient MetricClient metricClient;
+
+  public DoFnExecutor(
+      String stepName,
+      String description,
+      JStormPipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Coder<WindowedValue<InputT>> inputCoder,
+      WindowingStrategy<?, ?> windowingStrategy,
+      TupleTag<InputT> mainInputTag,
+      Collection<PCollectionView<?>> sideInputs,
+      Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+      TupleTag<OutputT> mainTupleTag,
+      List<TupleTag<?>> sideOutputTags) {
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.description = checkNotNull(description, "description");
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.outputManager = new DoFnExecutorOutputManager();
+    this.windowingStrategy = windowingStrategy;
+    this.mainInputTag = mainInputTag;
+    this.sideInputs = sideInputs;
+    this.mainTupleTag = mainTupleTag;
+    this.sideOutputTags = sideOutputTags;
+    this.sideInputTagToView = sideInputTagToView;
+  }
+
+  protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+    return new DoFnRunnerWithMetrics<>(
+        stepName,
+        DoFnRunners.simpleRunner(
+            this.pipelineOptions,
+            this.doFn,
+            this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+            this.outputManager,
+            this.mainTupleTag,
+            this.sideOutputTags,
+            this.stepContext,
+            this.windowingStrategy),
+        MetricsReporter.create(metricClient));
+  }
+
+  protected void initService(ExecutorContext context) {
+    // TODO: what should be set for key in here?
+    timerInternals = new JStormTimerInternals(
+        null /* key */, this, context.getExecutorsBolt().timerService());
+    kvStoreManager = context.getKvStoreManager();
+    stepContext = new DefaultStepContext(timerInternals,
+        new JStormStateInternals(
+            null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+    metricClient = new MetricClient(executorContext.getTopologyContext());
+  }
+
+  @Override
+  public void init(ExecutorContext context) {
+    this.executorContext = context;
+    this.executorsBolt = context.getExecutorsBolt();
+    this.pipelineOptions =
+        this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+
+    initService(context);
+
+    // Side inputs setup
+    if (sideInputs != null && sideInputs.isEmpty() == false) {
+      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+      watermarkHoldTag =
+          StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+      pushbackStateInternals = new JStormStateInternals(
+          null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+      sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+      runner = getDoFnRunner();
+      pushbackRunner =
+          SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+    } else {
+      runner = getDoFnRunner();
     }
 
-    protected void initService(ExecutorContext context) {
-        // TODO: what should be set for key in here?
-        timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
-        kvStoreManager = context.getKvStoreManager();
-        stepContext = new DefaultStepContext(timerInternals,
-                new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
-        metricClient = new MetricClient(executorContext.getTopologyContext());
+    // Process user's setup
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker.invokeSetup();
+  }
+
+  @Override
+  public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+    LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+        tag, mainInputTag, sideInputs, elem.getValue()));
+    if (mainInputTag.equals(tag)) {
+      processMainInput(elem);
+    } else {
+      processSideInput(tag, elem);
     }
-
-    @Override
-    public void init(ExecutorContext context) {
-        this.executorContext = context;
-        this.executorsBolt = context.getExecutorsBolt();
-        this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
-
-        initService(context);
-
-        // Side inputs setup
-        if (sideInputs != null && sideInputs.isEmpty() == false) {
-            pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
-            watermarkHoldTag =
-                    StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
-            pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
-            sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
-            runner = getDoFnRunner();
-            pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
-        } else {
-            runner = getDoFnRunner();
+  }
+
+  protected <T> void processMainInput(WindowedValue<T> elem) {
+    if (sideInputs.isEmpty()) {
+      runner.processElement((WindowedValue<InputT>) elem);
+    } else {
+      Iterable<WindowedValue<InputT>> justPushedBack =
+          pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+      for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+        if (pushedBackValue.getTimestamp().isBefore(min)) {
+          min = pushedBackValue.getTimestamp();
         }
-
-        // Process user's setup
-        doFnInvoker = DoFnInvokers.invokerFor(doFn);
-        doFnInvoker.invokeSetup();
+        min = earlier(min, pushedBackValue.getTimestamp());
+        pushedBack.add(pushedBackValue);
+      }
+      pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
     }
+  }
 
-    @Override
-    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
-        LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
-                tag, mainInputTag, sideInputs, elem.getValue()));
-        if (mainInputTag.equals(tag)) {
-            processMainInput(elem);
-        } else {
-            processSideInput(tag, elem);
-        }
-    }
-
-    protected <T> void processMainInput(WindowedValue<T> elem) {
-       if (sideInputs.isEmpty()) {
-           runner.processElement((WindowedValue<InputT>) elem);
-       } else {
-           Iterable<WindowedValue<InputT>> justPushedBack =
-               pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
-           BagState<WindowedValue<InputT>> pushedBack =
-                   pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-
-           Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-           for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-               if (pushedBackValue.getTimestamp().isBefore(min)) {
-                   min = pushedBackValue.getTimestamp();
-               }
-               min = earlier(min, pushedBackValue.getTimestamp());
-               pushedBack.add(pushedBackValue);
-           }
-           pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
-       }
-    }
-
-    protected void processSideInput(TupleTag tag, WindowedValue elem) {
-        LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
-
-        PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
-        sideInputHandler.addSideInputValue(sideInputView, elem);
+  protected void processSideInput(TupleTag tag, WindowedValue elem) {
+    LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
 
-        BagState<WindowedValue<InputT>> pushedBack =
-                pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+    PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+    sideInputHandler.addSideInputValue(sideInputView, elem);
 
-        List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-        Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
-        if (pushedBackInputs != null) {
-            for (WindowedValue<InputT> input : pushedBackInputs) {
+    List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
 
-                Iterable<WindowedValue<InputT>> justPushedBack =
-                        pushbackRunner.processElementInReadyWindows(input);
-                Iterables.addAll(newPushedBack, justPushedBack);
-            }
-        }
-        pushedBack.clear();
-
-        Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
-        for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-            min = earlier(min, pushedBackValue.getTimestamp());
-            pushedBack.add(pushedBackValue);
-        }
+    Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+    if (pushedBackInputs != null) {
+      for (WindowedValue<InputT> input : pushedBackInputs) {
 
-        WatermarkHoldState watermarkHold =
-                pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-        // TODO: clear-then-add is not thread-safe.
-        watermarkHold.clear();
-        watermarkHold.add(min);
+        Iterable<WindowedValue<InputT>> justPushedBack =
+            pushbackRunner.processElementInReadyWindows(input);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
     }
+    pushedBack.clear();
 
-    /**
-     * Process all pushed back elements when receiving watermark with max timestamp
-     */
-    public void processAllPushBackElements() {
-        if (sideInputs != null && sideInputs.isEmpty() == false) {
-            BagState<WindowedValue<InputT>> pushedBackElements =
-                    pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
-            if (pushedBackElements != null) {
-                for (WindowedValue<InputT> elem : pushedBackElements.read()) {
-                    LOG.info("Process pushback elem={}", elem);
-                    runner.processElement(elem);
-                }
-                pushedBackElements.clear();
-            }
-
-            WatermarkHoldState watermarkHold =
-                    pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
-            watermarkHold.clear();
-            watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
-        }
+    Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+    for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+      min = earlier(min, pushedBackValue.getTimestamp());
+      pushedBack.add(pushedBackValue);
     }
 
-    public void onTimer(Object key, TimerInternals.TimerData timerData) {
-        StateNamespace namespace = timerData.getNamespace();
-        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-        BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-        if (pushbackRunner != null) {
-            pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
-        } else {
-            runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+    WatermarkHoldState watermarkHold =
+        pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+    // TODO: clear-then-add is not thread-safe.
+    watermarkHold.clear();
+    watermarkHold.add(min);
+  }
+
+  /**
+   * Process all pushed back elements when receiving watermark with max timestamp
+   */
+  public void processAllPushBackElements() {
+    if (sideInputs != null && sideInputs.isEmpty() == false) {
+      BagState<WindowedValue<InputT>> pushedBackElements =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+      if (pushedBackElements != null) {
+        for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+          LOG.info("Process pushback elem={}", elem);
+          runner.processElement(elem);
         }
-    }
+        pushedBackElements.clear();
+      }
 
-    @Override
-    public void cleanup() {
-        doFnInvoker.invokeTeardown();
+      WatermarkHoldState watermarkHold =
+          pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+      watermarkHold.clear();
+      watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
     }
-
-    @Override
-    public String toString() {
-        return description;
+  }
+
+  public void onTimer(Object key, TimerInternals.TimerData timerData) {
+    StateNamespace namespace = timerData.getNamespace();
+    checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+    BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+    if (pushbackRunner != null) {
+      pushbackRunner.onTimer(
+          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+    } else {
+      runner.onTimer(
+          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
     }
-
-    private Instant earlier(Instant left, Instant right) {
-        return left.isBefore(right) ? left : right;
+  }
+
+  @Override
+  public void cleanup() {
+    doFnInvoker.invokeTeardown();
+  }
+
+  @Override
+  public String toString() {
+    return description;
+  }
+
+  private Instant earlier(Instant left, Instant right) {
+    return left.isBefore(right) ? left : right;
+  }
+
+  public void startBundle() {
+    if (pushbackRunner != null) {
+      pushbackRunner.startBundle();
+    } else {
+      runner.startBundle();
     }
+  }
 
-    public void startBundle() {
-        if (pushbackRunner != null) {
-            pushbackRunner.startBundle();
-        } else {
-            runner.startBundle();
-        }
+  public void finishBundle() {
+    if (pushbackRunner != null) {
+      pushbackRunner.finishBundle();
+    } else {
+      runner.finishBundle();
     }
+  }
 
-    public void finishBundle() {
-        if (pushbackRunner != null) {
-            pushbackRunner.finishBundle();
-        } else {
-            runner.finishBundle();
-        }
-    }
+  public void setInternalDoFnExecutorId(int id) {
+    this.internalDoFnExecutorId = id;
+  }
 
-    public void setInternalDoFnExecutorId(int id) {
-        this.internalDoFnExecutorId = id;
-    }
-
-    public int getInternalDoFnExecutorId() {
-        return internalDoFnExecutorId;
-    }
+  public int getInternalDoFnExecutorId() {
+    return internalDoFnExecutorId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
index 98dbcc5..1610a8a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
@@ -68,7 +68,8 @@ public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT
   }
 
   @Override
-  public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
     try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
         metricsReporter.getMetricsContainer(stepName))) {
       delegate.onTimer(timerId, window, timestamp, timeDomain);

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
index d7214db..1a03cb8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -18,17 +18,16 @@
 package org.apache.beam.runners.jstorm.translation.runtime;
 
 import java.io.Serializable;
-
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 public interface Executor extends Serializable {
-    /**
-     * Initialization during runtime
-     */
-    void init(ExecutorContext context);
+  /**
+   * Initialization during runtime
+   */
+  void init(ExecutorContext context);
 
-    <T> void  process(TupleTag<T> tag, WindowedValue<T> elem);
+  <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
 
-    void cleanup();
+  void cleanup();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
index 1de881f..1f65921 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -23,13 +23,16 @@ import com.google.auto.value.AutoValue;
 
 @AutoValue
 public abstract class ExecutorContext {
-    public static ExecutorContext of(TopologyContext topologyContext, ExecutorsBolt bolt, IKvStoreManager kvStoreManager) {
-        return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
-    }
+  public static ExecutorContext of(
+      TopologyContext topologyContext,
+      ExecutorsBolt bolt,
+      IKvStoreManager kvStoreManager) {
+    return new AutoValue_ExecutorContext(topologyContext, bolt, kvStoreManager);
+  }
 
-    public abstract TopologyContext getTopologyContext();
+  public abstract TopologyContext getTopologyContext();
 
-    public abstract ExecutorsBolt getExecutorsBolt();
+  public abstract ExecutorsBolt getExecutorsBolt();
 
-    public abstract IKvStoreManager getKvStoreManager();
+  public abstract IKvStoreManager getKvStoreManager();
 }