You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:20 UTC

[20/53] [abbrv] beam git commit: jstorm-runner: Fix checkstyle error

jstorm-runner: Fix checkstyle error


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5a15d548
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5a15d548
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5a15d548

Branch: refs/heads/jstorm-runner
Commit: 5a15d5488f9438695948e72af08ada4c263471d7
Parents: 78a5076
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Fri Jul 14 14:14:49 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:57 2017 +0800

----------------------------------------------------------------------
 .../runners/jstorm/JStormPipelineOptions.java   |   3 +
 .../beam/runners/jstorm/JStormRunner.java       |  16 +-
 .../beam/runners/jstorm/TestJStormRunner.java   |   2 +-
 .../serialization/ImmutableListSerializer.java  |   4 +-
 .../serialization/ImmutableMapSerializer.java   |   3 +
 .../serialization/ImmutableSetSerializer.java   |   3 +
 .../KvStoreIterableSerializer.java              |   3 +
 .../SdkRepackImmuListSerializer.java            |   3 +
 .../SdkRepackImmuSetSerializer.java             |   3 +
 .../UnmodifiableCollectionsSerializer.java      |   5 +-
 .../translation/JStormPipelineTranslator.java   | 186 +++++++++++++++++++
 .../translation/StormPipelineTranslator.java    | 186 -------------------
 .../jstorm/translation/TranslationContext.java  |   9 +-
 .../translation/runtime/AbstractComponent.java  |   4 +-
 .../translation/runtime/AdaptorBasicBolt.java   |   5 +-
 .../translation/runtime/AdaptorBasicSpout.java  |   5 +-
 .../translation/runtime/DoFnExecutor.java       |  16 +-
 .../jstorm/translation/runtime/Executor.java    |   7 +-
 .../translation/runtime/ExecutorContext.java    |   3 +
 .../translation/runtime/ExecutorsBolt.java      |  15 +-
 .../translation/runtime/FlattenExecutor.java    |   6 +-
 .../runtime/GroupByWindowExecutor.java          |   5 +
 .../runtime/MultiOutputDoFnExecutor.java        |   7 +-
 .../runtime/MultiStatefulDoFnExecutor.java      |   4 +
 .../runtime/StatefulDoFnExecutor.java           |   4 +
 .../translation/runtime/TimerServiceImpl.java   |   8 +-
 .../translation/runtime/TxExecutorsBolt.java    |   5 +-
 .../runtime/TxUnboundedSourceSpout.java         |   5 +-
 .../runtime/UnboundedSourceSpout.java           |   5 +-
 .../runtime/WindowAssignExecutor.java           |   7 +-
 .../runtime/state/JStormBagState.java           |   5 +-
 .../runtime/state/JStormMapState.java           |   7 +-
 .../translator/FlattenTranslator.java           |   6 +-
 .../translator/GroupByKeyTranslator.java        |   5 +
 .../translator/ParDoBoundMultiTranslator.java   |   2 +-
 .../translator/ParDoBoundTranslator.java        |   4 +-
 .../jstorm/translation/translator/Stream.java   |  11 +-
 .../translator/TransformTranslator.java         |   4 +
 .../translation/translator/ViewTranslator.java  |  18 +-
 .../translator/WindowAssignTranslator.java      |   7 +-
 .../jstorm/translation/util/CommonInstance.java |   5 +-
 .../beam/runners/jstorm/util/RunnerUtils.java   |  12 +-
 .../jstorm/util/SerializedPipelineOptions.java  |   2 +-
 .../jstorm/util/SingletonKeyedWorkItem.java     |   3 +-
 .../runtime/state/JStormStateInternalsTest.java |  14 +-
 45 files changed, 384 insertions(+), 258 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
index 2a87756..114877a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
@@ -64,6 +64,9 @@ public interface JStormPipelineOptions extends PipelineOptions {
   Map getParallelismNumMap();
   void setParallelismNumMap(Map parallelismNumMap);
 
+  /**
+   * Default value factory for topology configuration of JStorm.
+   */
   class DefaultMapValueFactory implements DefaultValueFactory<Map> {
     @Override
     public Map create(PipelineOptions pipelineOptions) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 5375d6e..00ec7f6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -78,17 +78,17 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
   }
 
   /**
-   * convert pipeline options to storm configuration format
-   *
+   * convert pipeline options to storm configuration format.
    * @param options
    * @return
    */
   private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
     Config config = new Config();
-    if (options.getLocalMode())
+    if (options.getLocalMode()) {
       config.put(Config.STORM_CLUSTER_MODE, "local");
-    else
+    } else {
       config.put(Config.STORM_CLUSTER_MODE, "distributed");
+    }
 
     Config.setNumWorkers(config, options.getWorkerNumber());
 
@@ -161,8 +161,9 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
       component = spout;
     } else {
       AdaptorBasicBolt bolt = context.getBolt(id);
-      if (bolt != null)
+      if (bolt != null) {
         component = bolt;
+      }
     }
 
     return component;
@@ -202,10 +203,11 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
 
       // add stream output declare for "from" component
       AbstractComponent component = getComponent(srcBoltId, context);
-      if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+      if (grouping.getType().equals(Stream.Grouping.Type.FIELDS)) {
         component.addKVOutputField(streamId);
-      else
+      } else {
         component.addOutputField(streamId);
+      }
 
       // "to" component declares grouping to "from" component
       switch (grouping.getType()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index e27efc0..b1b0379 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -2,7 +2,6 @@ package org.apache.beam.runners.jstorm;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import avro.shaded.com.google.common.collect.Maps;
 import com.alibaba.jstorm.common.metric.AsmMetric;
 import com.alibaba.jstorm.metric.AsmMetricRegistry;
 import com.alibaba.jstorm.metric.AsmWindow;
@@ -11,6 +10,7 @@ import com.alibaba.jstorm.metric.MetaType;
 import com.alibaba.jstorm.metric.MetricType;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
index c479f26..268774c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -10,8 +10,10 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableTable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
 
+/**
+ * Specific serializer of {@link Kryo} for ImmutableList.
+ */
 public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
 
   private static final boolean DOES_NOT_ACCEPT_NULL = false;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
index 77eede3..6b998fc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -11,6 +11,9 @@ import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Specific serializer of {@link Kryo} for ImmutableMap.
+ */
 public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
 
   private static final boolean DOES_NOT_ACCEPT_NULL = true;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
index 3a43b2b..edc7b09 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+/**
+ * Specific serializer of {@link Kryo} for ImmutableSet.
+ */
 public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
 
   private static final boolean DOES_NOT_ACCEPT_NULL = false;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
index b47f3b7..3835816 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -9,6 +9,9 @@ import com.google.common.collect.Lists;
 import java.util.Iterator;
 import java.util.List;
 
+/**
+ * Specific serializer of {@link Kryo} for KvStoreIterable.
+ */
 public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
 
   public KvStoreIterableSerializer() {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
index dd4272c..f1ed644 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -12,6 +12,9 @@ import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
 
+/**
+ * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableList.
+ */
 public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
 
   private static final boolean DOES_NOT_ACCEPT_NULL = false;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
index 6973c82..d1ed046 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -8,6 +8,9 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
 import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
 
+/**
+ * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableSet.
+ */
 public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
 
   private static final boolean DOES_NOT_ACCEPT_NULL = false;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
index bcee778..33343fc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -21,6 +21,9 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+/**
+ * Specific serializer of {@link Kryo} for Unmodifiable Collection.
+ */
 public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
 
   private static final Field SOURCE_COLLECTION_FIELD;
@@ -83,7 +86,7 @@ public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
     }
   }
 
-  private static enum UnmodifiableCollection {
+  private enum UnmodifiableCollection {
     COLLECTION(
         Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
         SOURCE_COLLECTION_FIELD) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
new file mode 100644
index 0000000..1449a43
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormPipelineTranslator.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pipleline translator of JStorm.
+ */
+public class JStormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+  private static final Logger LOG = LoggerFactory.getLogger(JStormPipelineTranslator.class);
+  private TranslationContext context;
+  private int depth = 0;
+
+  public JStormPipelineTranslator(TranslationContext context) {
+    this.context = context;
+  }
+
+  public void translate(Pipeline pipeline) {
+    List<PTransformOverride> transformOverrides =
+        ImmutableList.<PTransformOverride>builder()
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+            .add(PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                new ReflectiveOneToOneOverrideFactory(
+                    (ViewTranslator.CombineGloballyAsSingletonView.class))))
+            .build();
+    pipeline.replaceAll(transformOverrides);
+    pipeline.traverseTopologically(this);
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+    this.depth++;
+
+    // check if current composite transforms need to be translated.
+    // If not, all sub transforms will be translated in visitPrimitiveTransform.
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null) {
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+      if (translator != null && applyCanTranslate(transform, node, translator)) {
+        applyStreamingTransform(transform, node, translator);
+        LOG.info(genSpaces(this.depth) + "translated-" + node);
+        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+      }
+    }
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
+    this.depth--;
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+  }
+
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+    if (!node.isRootNode()) {
+      PTransform<?, ?> transform = node.getTransform();
+      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+      if (translator == null || !applyCanTranslate(transform, node, translator)) {
+        LOG.info(node.getTransform().getClass().toString());
+        throw new UnsupportedOperationException(
+            "The transform " + transform + " is currently not supported.");
+      }
+      applyStreamingTransform(transform, node, translator);
+    }
+  }
+
+  public void visitValue(PValue value, TransformHierarchy.Node node) {
+    LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+  }
+
+  private <T extends PTransform<?, ?>> void applyStreamingTransform(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+    typedTranslator.translateNode(typedTransform, context);
+
+    // Maintain PValue to TupleTag map for side inputs translation.
+    context.getUserGraphContext().recordOutputTaggedPValue();
+  }
+
+  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
+      PTransform<?, ?> transform,
+      TransformHierarchy.Node node,
+      TransformTranslator<?> translator) {
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+    @SuppressWarnings("unchecked")
+    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+    return typedTranslator.canTranslate(typedTransform, context);
+  }
+
+  /**
+   * Utility formatting method.
+   *
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
+    }
+    return builder.toString();
+  }
+
+  private static class ReflectiveOneToOneOverrideFactory<
+      InputT extends PValue,
+      OutputT extends PValue,
+      TransformT extends PTransform<InputT, OutputT>>
+      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+    private final Class<PTransform<InputT, OutputT>> replacement;
+
+    private ReflectiveOneToOneOverrideFactory(
+        Class<PTransform<InputT, OutputT>> replacement) {
+      this.replacement = replacement;
+    }
+
+    @Override
+    public PTransformReplacement<InputT, OutputT> getReplacementTransform(
+        AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+      PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+      PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+          .withArg(
+              (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
+              originalPTransform)
+          .build();
+      InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+      return PTransformReplacement.of(inputT, replacedPTransform);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
deleted file mode 100644
index 6d6f1c6..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.List;
-import org.apache.beam.runners.core.construction.PTransformMatchers;
-import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
-import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
-import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.InstanceBuilder;
-import org.apache.beam.sdk.values.PValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Pipleline translator of Storm
- */
-public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
-  private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
-  private TranslationContext context;
-  private int depth = 0;
-
-  public StormPipelineTranslator(TranslationContext context) {
-    this.context = context;
-  }
-
-  public void translate(Pipeline pipeline) {
-    List<PTransformOverride> transformOverrides =
-        ImmutableList.<PTransformOverride>builder()
-            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
-                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
-            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
-                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
-            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
-                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
-            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
-                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
-            .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
-            .add(PTransformOverride.of(
-                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                new ReflectiveOneToOneOverrideFactory(
-                    (ViewTranslator.CombineGloballyAsSingletonView.class))))
-            .build();
-    pipeline.replaceAll(transformOverrides);
-    pipeline.traverseTopologically(this);
-  }
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
-    this.depth++;
-
-    // check if current composite transforms need to be translated.
-    // If not, all sub transforms will be translated in visitPrimitiveTransform.
-    PTransform<?, ?> transform = node.getTransform();
-    if (transform != null) {
-      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-
-      if (translator != null && applyCanTranslate(transform, node, translator)) {
-        applyStreamingTransform(transform, node, translator);
-        LOG.info(genSpaces(this.depth) + "translated-" + node);
-        return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
-      }
-    }
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    this.depth--;
-    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
-  }
-
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
-    LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
-
-    if (!node.isRootNode()) {
-      PTransform<?, ?> transform = node.getTransform();
-      TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
-      if (translator == null || !applyCanTranslate(transform, node, translator)) {
-        LOG.info(node.getTransform().getClass().toString());
-        throw new UnsupportedOperationException(
-            "The transform " + transform + " is currently not supported.");
-      }
-      applyStreamingTransform(transform, node, translator);
-    }
-  }
-
-  public void visitValue(PValue value, TransformHierarchy.Node node) {
-    LOG.info(genSpaces(this.depth) + "visiting value {}", value);
-  }
-
-  private <T extends PTransform<?, ?>> void applyStreamingTransform(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      TransformTranslator<?> translator) {
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-    @SuppressWarnings("unchecked")
-    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
-    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-    typedTranslator.translateNode(typedTransform, context);
-
-    // Maintain PValue to TupleTag map for side inputs translation.
-    context.getUserGraphContext().recordOutputTaggedPValue();
-  }
-
-  private <T extends PTransform<?, ?>> boolean applyCanTranslate(
-      PTransform<?, ?> transform,
-      TransformHierarchy.Node node,
-      TransformTranslator<?> translator) {
-    @SuppressWarnings("unchecked")
-    T typedTransform = (T) transform;
-    @SuppressWarnings("unchecked")
-    TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
-
-    context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
-
-    return typedTranslator.canTranslate(typedTransform, context);
-  }
-
-  /**
-   * Utility formatting method.
-   *
-   * @param n number of spaces to generate
-   * @return String with "|" followed by n spaces
-   */
-  protected static String genSpaces(int n) {
-    StringBuilder builder = new StringBuilder();
-    for (int i = 0; i < n; i++) {
-      builder.append("|   ");
-    }
-    return builder.toString();
-  }
-
-  private static class ReflectiveOneToOneOverrideFactory<
-      InputT extends PValue,
-      OutputT extends PValue,
-      TransformT extends PTransform<InputT, OutputT>>
-      extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
-    private final Class<PTransform<InputT, OutputT>> replacement;
-
-    private ReflectiveOneToOneOverrideFactory(
-        Class<PTransform<InputT, OutputT>> replacement) {
-      this.replacement = replacement;
-    }
-
-    @Override
-    public PTransformReplacement<InputT, OutputT> getReplacementTransform(
-        AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
-      PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
-      PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
-          .withArg(
-              (Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(),
-              originalPTransform)
-          .build();
-      InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
-      return PTransformReplacement.of(inputT, replacedPTransform);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 526352a..1230a31 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.jstorm.translation;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import avro.shaded.com.google.common.collect.Lists;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
@@ -239,7 +239,9 @@ public class TranslationContext {
     }
   }
 
-  // TODO: add getSideInputs() and getSideOutputs().
+  /**
+   * Context of user graph.
+   */
   public static class UserGraphContext {
     private final JStormPipelineOptions options;
     private final Map<PValue, TupleTag> pValueToTupleTag;
@@ -326,6 +328,9 @@ public class TranslationContext {
     }
   }
 
+  /**
+   * Context of execution graph.
+   */
   public static class ExecutionGraphContext {
 
     private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
index 68e9e17..3d7fab8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
 
-/*
+/**
  * Enable user to add output stream definitions by API, rather than hard-code.
  */
 public abstract class AbstractComponent implements IComponent {
@@ -65,4 +65,4 @@ public abstract class AbstractComponent implements IComponent {
   public void setParallelismNum(int num) {
     parallelismNum = num;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
index 5e9b056..d8d4d46 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 
 import backtype.storm.topology.IRichBatchBolt;
 
+/**
+ * Adaptor bolt of JStorm extends {@link AbstractComponent}.
+ */
 public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
index 0480518..814d416 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -19,6 +19,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 
 import backtype.storm.topology.IRichSpout;
 
+/**
+ * Adaptor bolt of JStorm extends {@link AbstractComponent}.
+ */
 public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
index 9507948..e07d890 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import avro.shaded.com.google.common.collect.Iterables;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.metric.MetricClient;
+import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,11 +62,19 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * JStorm {@link Executor} for {@link DoFn}.
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
 public class DoFnExecutor<InputT, OutputT> implements Executor {
   private static final long serialVersionUID = 5297603063991078668L;
 
   private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
 
+  /**
+   * Implements {@link OutputManager} in a DoFn executor.
+   */
   public class DoFnExecutorOutputManager implements OutputManager, Serializable {
     private static final long serialVersionUID = -661113364735206170L;
 
@@ -174,7 +182,7 @@ public class DoFnExecutor<InputT, OutputT> implements Executor {
     initService(context);
 
     // Side inputs setup
-    if (sideInputs != null && sideInputs.isEmpty() == false) {
+    if (sideInputs != null && !sideInputs.isEmpty()) {
       pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
       watermarkHoldTag =
           StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
@@ -261,10 +269,10 @@ public class DoFnExecutor<InputT, OutputT> implements Executor {
   }
 
   /**
-   * Process all pushed back elements when receiving watermark with max timestamp
+   * Process all pushed back elements when receiving watermark with max timestamp.
    */
   public void processAllPushBackElements() {
-    if (sideInputs != null && sideInputs.isEmpty() == false) {
+    if (sideInputs != null && !sideInputs.isEmpty()) {
       BagState<WindowedValue<InputT>> pushedBackElements =
           pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
       if (pushedBackElements != null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
index 1a03cb8..0ec4fdd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -21,13 +21,16 @@ import java.io.Serializable;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
+/**
+ * An executor is a basic executable unit in a JStorm task.
+ */
 public interface Executor extends Serializable {
   /**
-   * Initialization during runtime
+   * Initialization during runtime.
    */
   void init(ExecutorContext context);
 
   <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
 
   void cleanup();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
index 1f65921..55ca171 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorContext.java
@@ -21,6 +21,9 @@ import backtype.storm.task.TopologyContext;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.google.auto.value.AutoValue;
 
+/**
+ * Context of a executors bolt when runtime.
+ */
 @AutoValue
 public abstract class ExecutorContext {
   public static ExecutorContext of(

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
index e80fb48..d33c17a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/ExecutorsBolt.java
@@ -19,8 +19,6 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import avro.shaded.com.google.common.base.Joiner;
-import avro.shaded.com.google.common.collect.Sets;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.ITupleExt;
@@ -31,8 +29,10 @@ import com.alibaba.jstorm.cache.KvStoreManagerFactory;
 import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.utils.KryoSerializer;
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -49,6 +49,9 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * ExecutorsBolt is a JStorm Bolt composited with several executors chained in a sub-DAG.
+ */
 public class ExecutorsBolt extends AdaptorBasicBolt {
   private static final long serialVersionUID = -7751043327801735211L;
 
@@ -129,10 +132,10 @@ public class ExecutorsBolt extends AdaptorBasicBolt {
       // init kv store manager
       String storeName = String.format("task-%d", context.getThisTaskId());
       String stateStorePath = String.format("%s/beam/%s", context.getWorkerIdDir(), storeName);
-      IKvStoreManager kvStoreManager = isStatefulBolt ?
-          KvStoreManagerFactory.getKvStoreManagerWithMonitor(
-              context, storeName, stateStorePath, isStatefulBolt) :
-          KvStoreManagerFactory.getKvStoreManager(
+      IKvStoreManager kvStoreManager = isStatefulBolt
+              ? KvStoreManagerFactory.getKvStoreManagerWithMonitor(
+              context, storeName, stateStorePath, isStatefulBolt)
+              : KvStoreManagerFactory.getKvStoreManager(
               stormConf, storeName, stateStorePath, isStatefulBolt);
       this.executorContext = ExecutorContext.of(context, this, kvStoreManager);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
index 5a07243..caf1e47 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/FlattenExecutor.java
@@ -22,6 +22,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.TupleTag;
 
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
+ * @param <InputT>
+ */
 public class FlattenExecutor<InputT> implements Executor {
 
   private final String description;
@@ -53,4 +57,4 @@ public class FlattenExecutor<InputT> implements Executor {
   public String toString() {
     return description;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
index 625726d..0dd1af9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/GroupByWindowExecutor.java
@@ -52,6 +52,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
+ * @param <K>
+ * @param <V>
+ */
 public class GroupByWindowExecutor<K, V>
     extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
   private static final long serialVersionUID = -7563050475488610553L;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
index d36d9a6..a26472c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiOutputDoFnExecutor.java
@@ -30,6 +30,11 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * JStorm {@link Executor} for {@link DoFn} with multi-output.
+ * @param <InputT>
+ * @param <OutputT>
+ */
 public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
   private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
 
@@ -71,4 +76,4 @@ public class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<Input
     this.outputManager = new MultiOutputDoFnExecutorOutputManager();
     LOG.info("localTupleTagMap: {}", localTupleTagMap);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
index 45ac62a..5e87cff 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/MultiStatefulDoFnExecutor.java
@@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
+ * @param <OutputT>
+ */
 public class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
 
   public MultiStatefulDoFnExecutor(

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
index ba0c052..77ae844 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/StatefulDoFnExecutor.java
@@ -32,6 +32,10 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
+/**
+ * JStorm {@link Executor} for stateful {@link DoFn}.
+ * @param <OutputT>
+ */
 public class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
   public StatefulDoFnExecutor(
       String stepName, String description, JStormPipelineOptions pipelineOptions,

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
index d2514f1..0103095 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TimerServiceImpl.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.jstorm.translation.runtime;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import avro.shaded.com.google.common.collect.Maps;
-import avro.shaded.com.google.common.collect.Sets;
 import com.alibaba.jstorm.utils.Pair;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,7 +47,7 @@ public class TimerServiceImpl implements TimerService {
   private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
   private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
   private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
-  private transient final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+  private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
       new PriorityQueue<>();
   private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
       timerDataToKeyedExecutors = Maps.newHashMap();
@@ -152,4 +152,4 @@ public class TimerServiceImpl implements TimerService {
     keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
     timerDataToKeyedExecutors.put(timerData, keyedExecutors);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
index 2bd5f7d..8dc51b5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxExecutorsBolt.java
@@ -29,6 +29,9 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Transactional executors bolt handles the checkpoint and restore of state and timer.
+ */
 public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(TxExecutorsBolt.class);
 
@@ -127,4 +130,4 @@ public class TxExecutorsBolt implements ITransactionStatefulBoltExecutor {
       throw new RuntimeException(e.getMessage());
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
index 16f7d99..48b410f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/TxUnboundedSourceSpout.java
@@ -29,6 +29,9 @@ import java.util.Map;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Transactional unbounded source spout handles the checkpoint and restore of state and timer.
+ */
 public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
   private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(TxUnboundedSourceSpout.class);
 
@@ -150,4 +153,4 @@ public class TxUnboundedSourceSpout implements ITransactionSpoutExecutor {
   public void fail(Object msgId) {
     throw new UnsupportedOperationException();
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
index 7f98c61..006cd47 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/UnboundedSourceSpout.java
@@ -40,8 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Spout implementation that wraps a Beam UnboundedSource
- * <p>
+ * Spout implementation that wraps a Beam UnboundedSource.
  * TODO: add wrapper to support metrics in UnboundedSource.
  */
 public class UnboundedSourceSpout extends AdaptorBasicSpout {
@@ -61,7 +60,7 @@ public class UnboundedSourceSpout extends AdaptorBasicSpout {
 
   private KryoSerializer<WindowedValue> serializer;
 
-  private long lastWaterMark = 0l;
+  private long lastWaterMark = 0L;
 
   public UnboundedSourceSpout(
       String description,

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
index 7f21d26..3cd0aa9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/WindowAssignExecutor.java
@@ -29,6 +29,11 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
+ * @param <T>
+ * @param <W>
+ */
 public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
   private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
 
@@ -104,4 +109,4 @@ public class WindowAssignExecutor<T, W extends BoundedWindow> implements Executo
   public String toString() {
     return description;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
index 1466f35..df54383 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormBagState.java
@@ -33,7 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * JStorm implementation of {@link BagState}.
+ * Implementation of {@link BagState} in JStorm runner.
  */
 class JStormBagState<K, T> implements BagState<T> {
   private static final Logger LOG = LoggerFactory.getLogger(JStormBagState.class);
@@ -115,6 +115,9 @@ class JStormBagState<K, T> implements BagState<T> {
     return ComposedKey.of(key, namespace, elemIndex);
   }
 
+  /**
+   * Implementation of Bag state Iterable.
+   */
   private class BagStateIterable implements KvStoreIterable<T> {
 
     private class BagStateIterator implements Iterator<T> {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
index f1c1ed0..ac3f91f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormMapState.java
@@ -26,6 +26,11 @@ import org.apache.beam.sdk.state.ReadableState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Implementation of {@link MapState} in JStorm runner.
+ * @param <K>
+ * @param <V>
+ */
 public class JStormMapState<K, V> implements MapState<K, V> {
   private static final Logger LOG = LoggerFactory.getLogger(JStormMapState.class);
 
@@ -150,4 +155,4 @@ public class JStormMapState<K, V> implements MapState<K, V> {
       return this;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
index bf8d472..44ce8d8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/FlattenTranslator.java
@@ -26,6 +26,10 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
+/**
+ * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
+ * @param <V>
+ */
 public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollections<V>> {
 
   @Override
@@ -44,4 +48,4 @@ public class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PC
     FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
     context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
index 85f96ce..85cb85d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/GroupByKeyTranslator.java
@@ -29,6 +29,11 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 
+/**
+ * Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
+ * @param <K>
+ * @param <V>
+ */
 public class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
   // information of transform
   protected PCollection<KV<K, V>> input;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
index 77e4381..6e3392c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundMultiTranslator.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
-import avro.shaded.com.google.common.collect.Maps;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
index 7b998d9..ad8f85f 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ParDoBoundTranslator.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.jstorm.translation.translator;
 
-import avro.shaded.com.google.common.collect.Lists;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.translation.TranslationContext;
@@ -40,7 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Translates a ParDo.Bound to a Storm {@link DoFnExecutor}.
+ * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
  */
 public class ParDoBoundTranslator<InputT, OutputT>
     extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
index a15a8ba..71243b9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/Stream.java
@@ -39,6 +39,9 @@ public abstract class Stream {
         producer, consumer);
   }
 
+  /**
+   * JStorm producer.
+   */
   @AutoValue
   public abstract static class Producer {
     public abstract String getComponentId();
@@ -53,6 +56,9 @@ public abstract class Stream {
     }
   }
 
+  /**
+   * JStorm consumer.
+   */
   @AutoValue
   public abstract static class Consumer {
     public abstract String getComponentId();
@@ -65,6 +71,9 @@ public abstract class Stream {
     }
   }
 
+  /**
+   * JStorm grouping, which define how to transfer message between two nodes.
+   */
   @AutoValue
   public abstract static class Grouping {
     public abstract Type getType();
@@ -86,7 +95,7 @@ public abstract class Stream {
     }
 
     /**
-     * Types of stream groupings Storm allows
+     * Types of stream groupings Storm allows.
      */
     public enum Type {
       ALL, CUSTOM, DIRECT, SHUFFLE, LOCAL_OR_SHUFFLE, FIELDS, GLOBAL, NONE

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
index 487cac0..bfa94a0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/TransformTranslator.java
@@ -38,6 +38,10 @@ public interface TransformTranslator<T extends PTransform<?, ?>> {
    */
   boolean canTranslate(T transform, TranslationContext context);
 
+    /**
+     * Default translator.
+     * @param <T1>
+     */
   class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
     @Override
     public void translateNode(T1 transform, TranslationContext context) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
index c55c8d6..f71ee9c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/ViewTranslator.java
@@ -54,8 +54,7 @@ public class ViewTranslator
 
   /**
    * Specialized implementation for
-   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}
-   * for the Flink runner in streaming mode.
+   * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap}.
    */
   public static class ViewAsMap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
@@ -93,8 +92,7 @@ public class ViewTranslator
 
   /**
    * Specialized expansion for {@link
-   * View.AsMultimap View.AsMultimap} for the
-   * Flink runner in streaming mode.
+   * View.AsMultimap View.AsMultimap}.
    */
   public static class ViewAsMultimap<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> {
@@ -135,8 +133,7 @@ public class ViewTranslator
 
   /**
    * Specialized implementation for
-   * {@link View.AsList View.AsList} for the
-   * JStorm runner in streaming mode.
+   * {@link View.AsList View.AsList}.
    */
   public static class ViewAsList<T>
       extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
@@ -258,6 +255,12 @@ public class ViewTranslator
     }
   }
 
+  /**
+   * Specialized expansion for
+   * {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
+   * @param <InputT>
+   * @param <OutputT>
+     */
   public static class CombineGloballyAsSingletonView<InputT, OutputT>
       extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
     Combine.GloballyAsSingletonView<InputT, OutputT> transform;
@@ -351,8 +354,7 @@ public class ViewTranslator
 
   /**
    * Creates a primitive {@link PCollectionView}.
-   * <p>
-   * <p>For internal use only by runner implementors.
+   * For internal use only by runner implementors.
    *
    * @param <ElemT> The type of the elements of the input PCollection
    * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
index 6de34dd..2ccb8d7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/translator/WindowAssignTranslator.java
@@ -21,6 +21,11 @@ import org.apache.beam.runners.jstorm.translation.TranslationContext;
 import org.apache.beam.runners.jstorm.translation.runtime.WindowAssignExecutor;
 import org.apache.beam.sdk.transforms.windowing.Window;
 
+/**
+ * Translates a {@link org.apache.beam.sdk.transforms.windowing.Window.Assign} to a
+ * JStorm {@link WindowAssignExecutor}.
+ * @param <T>
+ */
 public class WindowAssignTranslator<T> extends TransformTranslator.Default<Window.Assign<T>> {
 
   @Override
@@ -35,4 +40,4 @@ public class WindowAssignTranslator<T> extends TransformTranslator.Default<Windo
         userGraphContext.getOutputTag());
     context.addTransformExecutor(executor);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
index 596d8b4..4b92a4c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/util/CommonInstance.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.runners.jstorm.translation.util;
 
+/**
+ * Common definition of JStorm runner.
+ */
 public class CommonInstance {
   public static final String KEY = "Key";
   public static final String VALUE = "Value";
 
   public static final String BEAM_WATERMARK_STREAM_ID = "BEAM_WATERMARK";
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
index 9fd62e4..ad83c2b 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/RunnerUtils.java
@@ -25,10 +25,12 @@ import org.apache.beam.runners.jstorm.translation.runtime.StatefulDoFnExecutor;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 
+/**
+ * Utils for JStorm runner.
+ */
 public class RunnerUtils {
   /**
-   * Convert WindowedValue<KV<>> into KeyedWorkItem<K, WindowedValue<V>>
-   *
+   * Convert {@link WindowedValue} into {@link KeyedWorkItem}.
    * @param elem
    * @return
    */
@@ -43,11 +45,11 @@ public class RunnerUtils {
   public static boolean isGroupByKeyExecutor(Executor executor) {
     if (executor instanceof GroupByWindowExecutor) {
       return true;
-    } else if (executor instanceof StatefulDoFnExecutor ||
-        executor instanceof MultiStatefulDoFnExecutor) {
+    } else if (executor instanceof StatefulDoFnExecutor
+            || executor instanceof MultiStatefulDoFnExecutor) {
       return true;
     } else {
       return false;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
index 182794f..479afdc 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SerializedPipelineOptions.java
@@ -34,7 +34,7 @@ public class SerializedPipelineOptions implements Serializable {
   private final byte[] serializedOptions;
 
   /**
-   * Lazily initialized copy of deserialized options
+   * Lazily initialized copy of deserialized options.
    */
   private transient PipelineOptions pipelineOptions;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
index cce21b3..46a12b9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/util/SingletonKeyedWorkItem.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 
 /**
  * Singleton keyed word item.
- *
  * @param <K>
  * @param <ElemT>
  */
@@ -60,4 +59,4 @@ public class SingletonKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
   public Iterable<WindowedValue<ElemT>> elementsIterable() {
     return Collections.singletonList(value);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5a15d548/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
index 2a8160c..66f33a7 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/runtime/state/JStormStateInternalsTest.java
@@ -21,10 +21,10 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
-import avro.shaded.com.google.common.collect.Maps;
 import com.alibaba.jstorm.cache.IKvStoreManager;
 import com.alibaba.jstorm.cache.rocksdb.RocksDbKvStoreManagerFactory;
 import com.alibaba.jstorm.utils.KryoSerializer;
+import com.google.common.collect.Maps;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -175,11 +175,11 @@ public class JStormStateInternalsTest {
     Iterable<Map.Entry<Integer, Integer>> entries = mapStateA.entries().read();
     Iterator<Map.Entry<Integer, Integer>> itr = entries.iterator();
     Map.Entry<Integer, Integer> entry = itr.next();
-    assertEquals((long) entry.getKey(), 1l);
-    assertEquals((long) entry.getValue(), 12l);
+    assertEquals((long) entry.getKey(), 1L);
+    assertEquals((long) entry.getValue(), 12L);
     entry = itr.next();
-    assertEquals((long) entry.getKey(), 2l);
-    assertEquals((long) entry.getValue(), 22l);
+    assertEquals((long) entry.getKey(), 2L);
+    assertEquals((long) entry.getValue(), 22L);
     assertEquals(false, itr.hasNext());
 
     mapStateA.remove(1);
@@ -191,8 +191,8 @@ public class JStormStateInternalsTest {
     entries = mapStateA.entries().read();
     itr = entries.iterator();
     entry = itr.next();
-    assertEquals((long) entry.getKey(), 2l);
-    assertEquals((long) entry.getValue(), 22l);
+    assertEquals((long) entry.getKey(), 2L);
+    assertEquals((long) entry.getValue(), 22L);
     assertEquals(false, itr.hasNext());
   }