You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:15 UTC

[15/53] [abbrv] beam git commit: jstorm-runner: fix checkstyles.

jstorm-runner: fix checkstyles.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aa251a4a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aa251a4a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aa251a4a

Branch: refs/heads/jstorm-runner
Commit: aa251a4a4d2850310f5dfd9db4d605cce41bba13
Parents: f3df3a2
Author: Pei He <pe...@apache.org>
Authored: Thu Jul 13 17:37:51 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:47 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/jstorm/JStormRunner.java       | 395 +++++------
 .../runners/jstorm/JStormRunnerRegistrar.java   |  39 +-
 .../beam/runners/jstorm/JStormRunnerResult.java | 118 ++--
 .../beam/runners/jstorm/TestJStormRunner.java   | 188 +++---
 .../serialization/ImmutableListSerializer.java  | 152 +++--
 .../serialization/ImmutableMapSerializer.java   |  78 ++-
 .../serialization/ImmutableSetSerializer.java   |  93 +--
 .../KvStoreIterableSerializer.java              |  73 +-
 .../SdkRepackImmuListSerializer.java            | 116 ++--
 .../SdkRepackImmuSetSerializer.java             |  98 +--
 .../UnmodifiableCollectionsSerializer.java      | 290 ++++----
 .../translation/StormPipelineTranslator.java    | 273 ++++----
 .../jstorm/translation/TranslationContext.java  | 667 ++++++++++---------
 .../jstorm/translation/TranslatorRegistry.java  |  65 +-
 .../translation/runtime/AbstractComponent.java  |  66 +-
 .../translation/runtime/AdaptorBasicBolt.java   |   2 +-
 .../translation/runtime/AdaptorBasicSpout.java  |   2 +-
 .../translation/runtime/DoFnExecutor.java       | 511 +++++++-------
 .../runtime/DoFnRunnerWithMetrics.java          |   3 +-
 .../jstorm/translation/runtime/Executor.java    |  13 +-
 .../translation/runtime/ExecutorContext.java    |  15 +-
 .../translation/runtime/ExecutorsBolt.java      | 502 +++++++-------
 .../translation/runtime/FlattenExecutor.java    |  61 +-
 .../runtime/GroupByWindowExecutor.java          | 231 ++++---
 .../runtime/MultiOutputDoFnExecutor.java        |  79 ++-
 .../runtime/MultiStatefulDoFnExecutor.java      |  64 +-
 .../runtime/StatefulDoFnExecutor.java           |  63 +-
 .../translation/runtime/TimerService.java       |  37 +-
 .../translation/runtime/TimerServiceImpl.java   | 233 +++----
 .../translation/runtime/TxExecutorsBolt.java    | 193 +++---
 .../runtime/TxUnboundedSourceSpout.java         | 244 +++----
 .../runtime/UnboundedSourceSpout.java           | 288 ++++----
 .../translation/runtime/ViewExecutor.java       |  53 +-
 .../runtime/WindowAssignExecutor.java           | 130 ++--
 .../runtime/state/JStormBagState.java           | 261 ++++----
 .../runtime/state/JStormCombiningState.java     |  98 +--
 .../runtime/state/JStormMapState.java           | 227 ++++---
 .../runtime/state/JStormStateInternals.java     | 290 ++++----
 .../runtime/state/JStormValueState.java         |  92 ++-
 .../runtime/state/JStormWatermarkHoldState.java |  88 +--
 .../runtime/timer/JStormTimerInternals.java     | 143 ++--
 .../translator/BoundedSourceTranslator.java     |  29 +-
 .../translator/CombineGloballyTranslator.java   |   5 +-
 .../translator/CombinePerKeyTranslator.java     |   5 +-
 .../translator/FlattenTranslator.java           |  34 +-
 .../translator/GroupByKeyTranslator.java        |  71 +-
 .../translator/ParDoBoundMultiTranslator.java   | 143 ++--
 .../translator/ParDoBoundTranslator.java        | 128 ++--
 .../translator/ReshuffleTranslator.java         |   4 +-
 .../jstorm/translation/translator/Stream.java   | 109 +--
 .../translator/TransformTranslator.java         |  74 +-
 .../translator/UnboundedSourceTranslator.java   |  28 +-
 .../translation/translator/ViewTranslator.java  | 586 ++++++++--------
 .../translator/WindowAssignTranslator.java      |  26 +-
 .../translator/WindowBoundTranslator.java       |  26 +-
 .../jstorm/translation/util/CommonInstance.java |   6 +-
 .../util/DefaultSideInputReader.java            |  33 +-
 .../translation/util/DefaultStepContext.java    |  89 +--
 .../beam/runners/jstorm/util/RunnerUtils.java   |  46 +-
 .../jstorm/util/SerializedPipelineOptions.java  |  51 +-
 .../jstorm/util/SingletonKeyedWorkItem.java     |   3 +-
 .../jstorm/JStormRunnerRegistrarTest.java       |   4 +-
 .../runtime/state/JStormStateInternalsTest.java | 345 +++++-----
 63 files changed, 4314 insertions(+), 4165 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 39c723b..5fdbe4d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.jstorm;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
@@ -31,8 +29,6 @@ import backtype.storm.tuple.Fields;
 import com.alibaba.jstorm.cache.KvStoreIterable;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
-import com.alibaba.jstorm.utils.JStormUtils;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
@@ -54,12 +50,9 @@ import org.apache.beam.runners.jstorm.translation.runtime.UnboundedSourceSpout;
 import org.apache.beam.runners.jstorm.translation.translator.Stream;
 import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,204 +63,218 @@ import org.slf4j.LoggerFactory;
 public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
 
 
-    private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class);
-
-    private JStormPipelineOptions options;
-
-    public JStormRunner(JStormPipelineOptions options) {
-        this.options = options;
+  private static final Logger LOG = LoggerFactory.getLogger(JStormRunner.class);
+
+  private JStormPipelineOptions options;
+
+  public JStormRunner(JStormPipelineOptions options) {
+    this.options = options;
+  }
+
+  public static JStormRunner fromOptions(PipelineOptions options) {
+    JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(
+        JStormPipelineOptions.class, options);
+    return new JStormRunner(pipelineOptions);
+  }
+
+  /**
+   * convert pipeline options to storm configuration format
+   *
+   * @param options
+   * @return
+   */
+  private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
+    Config config = new Config();
+    if (options.getLocalMode())
+      config.put(Config.STORM_CLUSTER_MODE, "local");
+    else
+      config.put(Config.STORM_CLUSTER_MODE, "distributed");
+
+    Config.setNumWorkers(config, options.getWorkerNumber());
+
+    config.putAll(options.getTopologyConfig());
+
+    // Setup config for runtime env
+    config.put("worker.external", "beam");
+    config.put("topology.acker.executors", 0);
+
+    UnmodifiableCollectionsSerializer.registerSerializers(config);
+    // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
+    ImmutableListSerializer.registerSerializers(config);
+    SdkRepackImmuListSerializer.registerSerializers(config);
+    ImmutableSetSerializer.registerSerializers(config);
+    SdkRepackImmuSetSerializer.registerSerializers(config);
+    ImmutableMapSerializer.registerSerializers(config);
+
+    config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+    return config;
+  }
+
+  @Override
+  public JStormRunnerResult run(Pipeline pipeline) {
+    LOG.info("Running pipeline...");
+    TranslationContext context = new TranslationContext(this.options);
+    StormPipelineTranslator transformer = new StormPipelineTranslator(context);
+    transformer.translate(pipeline);
+    LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
+    LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
+
+    for (Stream stream : context.getExecutionGraphContext().getStreams()) {
+      LOG.info(
+          stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
     }
 
-    public static JStormRunner fromOptions(PipelineOptions options) {
-        JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(JStormPipelineOptions.class, options);
-        return new JStormRunner(pipelineOptions);
+    String topologyName = options.getJobName();
+    Config config = convertPipelineOptionsToConfig(options);
+
+    return runTopology(
+        topologyName,
+        getTopology(options, context.getExecutionGraphContext()),
+        config);
+  }
+
+  private JStormRunnerResult runTopology(
+      String topologyName,
+      StormTopology topology,
+      Config config) {
+    try {
+      if (StormConfig.local_mode(config)) {
+        LocalCluster localCluster = LocalCluster.getInstance();
+        localCluster.submitTopology(topologyName, config, topology);
+        return JStormRunnerResult.local(
+            topologyName, config, localCluster, options.getLocalModeExecuteTime());
+      } else {
+        StormSubmitter.submitTopology(topologyName, config, topology);
+        return null;
+      }
+    } catch (Exception e) {
+      LOG.warn("Fail to submit topology", e);
+      throw new RuntimeException("Fail to submit topology", e);
     }
-
-    /**
-     * convert pipeline options to storm configuration format
-     * @param options
-     * @return
-     */
-    private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
-        Config config = new Config();
-        if (options.getLocalMode())
-            config.put(Config.STORM_CLUSTER_MODE, "local");
-        else
-            config.put(Config.STORM_CLUSTER_MODE, "distributed");
-
-        Config.setNumWorkers(config, options.getWorkerNumber());
-
-        config.putAll(options.getTopologyConfig());
-
-        // Setup config for runtime env
-        config.put("worker.external", "beam");
-        config.put("topology.acker.executors", 0);
-
-        UnmodifiableCollectionsSerializer.registerSerializers(config);
-        // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
-        ImmutableListSerializer.registerSerializers(config);
-        SdkRepackImmuListSerializer.registerSerializers(config);
-        ImmutableSetSerializer.registerSerializers(config);
-        SdkRepackImmuSetSerializer.registerSerializers(config);
-        ImmutableMapSerializer.registerSerializers(config);
-
-        config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
-        return config;
+  }
+
+  private AbstractComponent getComponent(
+      String id, TranslationContext.ExecutionGraphContext context) {
+    AbstractComponent component = null;
+    AdaptorBasicSpout spout = context.getSpout(id);
+    if (spout != null) {
+      component = spout;
+    } else {
+      AdaptorBasicBolt bolt = context.getBolt(id);
+      if (bolt != null)
+        component = bolt;
     }
 
-    @Override
-    public JStormRunnerResult run(Pipeline pipeline) {
-        LOG.info("Running pipeline...");
-        TranslationContext context = new TranslationContext(this.options);
-        StormPipelineTranslator transformer = new StormPipelineTranslator(context);
-        transformer.translate(pipeline);
-        LOG.info("UserGraphContext=\n{}", context.getUserGraphContext());
-        LOG.info("ExecutionGraphContext=\n{}", context.getExecutionGraphContext());
-
-        for (Stream stream : context.getExecutionGraphContext().getStreams()) {
-            LOG.info(stream.getProducer().getComponentId() + " --> " + stream.getConsumer().getComponentId());
-        }
+    return component;
+  }
 
-        String topologyName = options.getJobName();
-        Config config = convertPipelineOptionsToConfig(options);
+  private StormTopology getTopology(
+      JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
+    boolean isExactlyOnce = options.getExactlyOnceTopology();
+    TopologyBuilder builder =
+        isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
 
-        return runTopology(
-            topologyName,
-            getTopology(options, context.getExecutionGraphContext()),
-            config);
+    int parallelismNumber = options.getParallelismNumber();
+    Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
+    for (String id : spouts.keySet()) {
+      IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
+      builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
     }
 
-    private JStormRunnerResult runTopology(String topologyName, StormTopology topology, Config config) {
-        try {
-            if (StormConfig.local_mode(config)) {
-                LocalCluster localCluster = LocalCluster.getInstance();
-                localCluster.submitTopology(topologyName, config, topology);
-                return JStormRunnerResult.local(
-                    topologyName, config, localCluster, options.getLocalModeExecuteTime());
-            } else {
-                StormSubmitter.submitTopology(topologyName, config, topology);
-                return null;
-            }
-        } catch (Exception e) {
-            LOG.warn("Fail to submit topology", e);
-            throw new RuntimeException("Fail to submit topology", e);
-        }
+    HashMap<String, BoltDeclarer> declarers = new HashMap<>();
+    Iterable<Stream> streams = context.getStreams();
+    LOG.info("streams=" + streams);
+    for (Stream stream : streams) {
+      String destBoltId = stream.getConsumer().getComponentId();
+      IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
+      BoltDeclarer declarer = declarers.get(destBoltId);
+      if (declarer == null) {
+        declarer = builder.setBolt(
+            destBoltId,
+            bolt,
+            getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
+        declarers.put(destBoltId, declarer);
+      }
+
+      Stream.Grouping grouping = stream.getConsumer().getGrouping();
+      String streamId = stream.getProducer().getStreamId();
+      String srcBoltId = stream.getProducer().getComponentId();
+
+      // add stream output declare for "from" component
+      AbstractComponent component = getComponent(srcBoltId, context);
+      if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
+        component.addKVOutputField(streamId);
+      else
+        component.addOutputField(streamId);
+
+      // "to" component declares grouping to "from" component
+      switch (grouping.getType()) {
+        case SHUFFLE:
+          declarer.shuffleGrouping(srcBoltId, streamId);
+          break;
+        case FIELDS:
+          declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
+          break;
+        case ALL:
+          declarer.allGrouping(srcBoltId, streamId);
+          break;
+        case DIRECT:
+          declarer.directGrouping(srcBoltId, streamId);
+          break;
+        case GLOBAL:
+          declarer.globalGrouping(srcBoltId, streamId);
+          break;
+        case LOCAL_OR_SHUFFLE:
+          declarer.localOrShuffleGrouping(srcBoltId, streamId);
+          break;
+        case NONE:
+          declarer.noneGrouping(srcBoltId, streamId);
+          break;
+        default:
+          throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
+      }
+
+      // Subscribe grouping of water mark stream
+      component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
+      declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
     }
 
-    private AbstractComponent getComponent(String id, TranslationContext.ExecutionGraphContext context) {
-        AbstractComponent component = null;
-        AdaptorBasicSpout spout = context.getSpout(id);
-        if (spout != null) {
-            component = spout;
-        } else {
-            AdaptorBasicBolt bolt = context.getBolt(id);
-            if (bolt != null)
-                component = bolt;
-        }
-
-        return component;
+    if (isExactlyOnce) {
+      ((TransactionTopologyBuilder) builder).enableHdfs();
     }
-
-    private StormTopology getTopology(JStormPipelineOptions options, TranslationContext.ExecutionGraphContext context) {
-        boolean isExactlyOnce = options.getExactlyOnceTopology();
-        TopologyBuilder builder = isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
-
-        int parallelismNumber = options.getParallelismNumber();
-        Map<String, AdaptorBasicSpout> spouts = context.getSpouts();
-        for (String id : spouts.keySet()) {
-            IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
-            builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
-        }
-
-        HashMap<String, BoltDeclarer> declarers = new HashMap<>();
-        Iterable<Stream> streams = context.getStreams();
-        LOG.info("streams=" + streams);
-        for (Stream stream : streams) {
-            String destBoltId = stream.getConsumer().getComponentId();
-            IRichBolt bolt = getBolt(isExactlyOnce, context.getBolt(destBoltId));
-            BoltDeclarer declarer = declarers.get(destBoltId);
-            if (declarer == null) {
-                declarer = builder.setBolt(destBoltId, bolt,
-                    getParallelismNum(context.getBolt(destBoltId), parallelismNumber));
-                declarers.put(destBoltId, declarer);
-            }
-
-            Stream.Grouping grouping = stream.getConsumer().getGrouping();
-            String streamId = stream.getProducer().getStreamId();
-            String srcBoltId = stream.getProducer().getComponentId();
-
-            // add stream output declare for "from" component
-            AbstractComponent component = getComponent(srcBoltId, context);
-            if (grouping.getType().equals(Stream.Grouping.Type.FIELDS))
-                component.addKVOutputField(streamId);
-            else
-                component.addOutputField(streamId);
-
-            // "to" component declares grouping to "from" component
-            switch (grouping.getType()) {
-                case SHUFFLE:
-                    declarer.shuffleGrouping(srcBoltId, streamId);
-                    break;
-                case FIELDS:
-                    declarer.fieldsGrouping(srcBoltId, streamId, new Fields(grouping.getFields()));
-                    break;
-                case ALL:
-                    declarer.allGrouping(srcBoltId, streamId);
-                    break;
-                case DIRECT:
-                    declarer.directGrouping(srcBoltId, streamId);
-                    break;
-                case GLOBAL:
-                    declarer.globalGrouping(srcBoltId, streamId);
-                    break;
-                case LOCAL_OR_SHUFFLE:
-                    declarer.localOrShuffleGrouping(srcBoltId, streamId);
-                    break;
-                case NONE:
-                    declarer.noneGrouping(srcBoltId, streamId);
-                    break;
-                default:
-                    throw new UnsupportedOperationException("unsupported grouping type: " + grouping);
-            }
-
-            // Subscribe grouping of water mark stream
-            component.addOutputField(CommonInstance.BEAM_WATERMARK_STREAM_ID);
-            declarer.allGrouping(srcBoltId, CommonInstance.BEAM_WATERMARK_STREAM_ID);
-        }
-
-        if (isExactlyOnce) {
-            ((TransactionTopologyBuilder) builder).enableHdfs();
-        }
-        return builder.createTopology();
-    }
-
-    private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
-        IRichSpout ret = null;
-        if (isExactlyOnce) {
-            if (spout instanceof UnboundedSourceSpout) {
-                ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
-            } else {
-                String error = String.format("The specified type(%s) is not supported in exactly once mode yet!", spout.getClass().toString());
-                throw new RuntimeException(error);
-            }
-        } else {
-            ret = spout;
-        }
-        return ret;
-    }
-
-    private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
-        return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
-    }
-
-    /**
-     * Calculate the final parallelism number according to the configured number and global number.
-     * @param component
-     * @param globalParallelismNum
-     * @return final parallelism number for the specified component
-     */
-    private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
-        int configParallelismNum = component.getParallelismNum();
-        return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+    return builder.createTopology();
+  }
+
+  private IRichSpout getSpout(boolean isExactlyOnce, IRichSpout spout) {
+    IRichSpout ret = null;
+    if (isExactlyOnce) {
+      if (spout instanceof UnboundedSourceSpout) {
+        ret = new TxUnboundedSourceSpout((UnboundedSourceSpout) spout);
+      } else {
+        String error = String.format(
+            "The specified type(%s) is not supported in exactly once mode yet!",
+            spout.getClass().toString());
+        throw new RuntimeException(error);
+      }
+    } else {
+      ret = spout;
     }
+    return ret;
+  }
+
+  private IRichBolt getBolt(boolean isExactlyOnce, ExecutorsBolt bolt) {
+    return isExactlyOnce ? new TxExecutorsBolt(bolt) : bolt;
+  }
+
+  /**
+   * Calculate the final parallelism number according to the configured number and global number.
+   *
+   * @param component
+   * @param globalParallelismNum
+   * @return final parallelism number for the specified component
+   */
+  private int getParallelismNum(AbstractComponent component, int globalParallelismNum) {
+    int configParallelismNum = component.getParallelismNum();
+    return configParallelismNum > 0 ? configParallelismNum : globalParallelismNum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
index 465236b..1b4d283 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerRegistrar.java
@@ -29,27 +29,28 @@ import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
  * {@link JStormRunner}.
  */
 public class JStormRunnerRegistrar {
-    private JStormRunnerRegistrar() {}
+  private JStormRunnerRegistrar() {
+  }
 
-    /**
-     * Register the {@link JStormPipelineOptions}.
-     */
-    @AutoService(PipelineOptionsRegistrar.class)
-    public static class Options implements PipelineOptionsRegistrar {
-        @Override
-        public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
-            return ImmutableList.<Class<? extends PipelineOptions>> of(JStormPipelineOptions.class);
-        }
+  /**
+   * Register the {@link JStormPipelineOptions}.
+   */
+  @AutoService(PipelineOptionsRegistrar.class)
+  public static class Options implements PipelineOptionsRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+      return ImmutableList.<Class<? extends PipelineOptions>>of(JStormPipelineOptions.class);
     }
+  }
 
-    /**
-     * Register the {@link JStormRunner}.
-     */
-    @AutoService(PipelineRunnerRegistrar.class)
-    public static class Runner implements PipelineRunnerRegistrar {
-        @Override
-        public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
-            return ImmutableList.<Class<? extends PipelineRunner<?>>> of(JStormRunner.class);
-        }
+  /**
+   * Register the {@link JStormRunner}.
+   */
+  @AutoService(PipelineRunnerRegistrar.class)
+  public static class Runner implements PipelineRunnerRegistrar {
+    @Override
+    public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+      return ImmutableList.<Class<? extends PipelineRunner<?>>>of(JStormRunner.class);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index e15ee6d..797c899 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -33,76 +33,76 @@ import org.joda.time.Duration;
  */
 public abstract class JStormRunnerResult implements PipelineResult {
 
-    public static JStormRunnerResult local(
+  public static JStormRunnerResult local(
+      String topologyName,
+      Config config,
+      LocalCluster localCluster,
+      long localModeExecuteTimeSecs) {
+    return new LocalStormPipelineResult(
+        topologyName, config, localCluster, localModeExecuteTimeSecs);
+  }
+
+  private final String topologyName;
+  private final Config config;
+
+  JStormRunnerResult(String topologyName, Config config) {
+    this.config = checkNotNull(config, "config");
+    this.topologyName = checkNotNull(topologyName, "topologyName");
+  }
+
+  public State getState() {
+    return null;
+  }
+
+  public Config getConfig() {
+    return config;
+  }
+
+  public String getTopologyName() {
+    return topologyName;
+  }
+
+  private static class LocalStormPipelineResult extends JStormRunnerResult {
+
+    private LocalCluster localCluster;
+    private long localModeExecuteTimeSecs;
+
+    LocalStormPipelineResult(
         String topologyName,
         Config config,
         LocalCluster localCluster,
         long localModeExecuteTimeSecs) {
-        return new LocalStormPipelineResult(
-            topologyName, config, localCluster, localModeExecuteTimeSecs);
-    }
-
-    private final String topologyName;
-    private final Config config;
-
-    JStormRunnerResult(String topologyName, Config config) {
-        this.config = checkNotNull(config, "config");
-        this.topologyName = checkNotNull(topologyName, "topologyName");
+      super(topologyName, config);
+      this.localCluster = checkNotNull(localCluster, "localCluster");
     }
 
-    public State getState() {
-        return null;
+    @Override
+    public State cancel() throws IOException {
+      //localCluster.deactivate(getTopologyName());
+      localCluster.killTopology(getTopologyName());
+      localCluster.shutdown();
+      JStormUtils.sleepMs(1000);
+      return State.CANCELLED;
     }
 
-    public Config getConfig() {
-        return config;
+    @Override
+    public State waitUntilFinish(Duration duration) {
+      return waitUntilFinish();
     }
 
-    public String getTopologyName() {
-        return topologyName;
+    @Override
+    public State waitUntilFinish() {
+      JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+      try {
+        return cancel();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
-    private static class LocalStormPipelineResult extends JStormRunnerResult {
-
-        private LocalCluster localCluster;
-        private long localModeExecuteTimeSecs;
-
-        LocalStormPipelineResult(
-            String topologyName,
-            Config config,
-            LocalCluster localCluster,
-            long localModeExecuteTimeSecs) {
-            super(topologyName, config);
-            this.localCluster = checkNotNull(localCluster, "localCluster");
-        }
-
-        @Override
-        public State cancel() throws IOException {
-          //localCluster.deactivate(getTopologyName());
-          localCluster.killTopology(getTopologyName());
-          localCluster.shutdown();
-          JStormUtils.sleepMs(1000);
-          return State.CANCELLED;
-        }
-
-        @Override
-        public State waitUntilFinish(Duration duration) {
-            return waitUntilFinish();
-        }
-
-        @Override
-        public State waitUntilFinish() {
-            JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
-            try {
-                return cancel();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public MetricResults metrics() {
-            return null;
-        }
+    @Override
+    public MetricResults metrics() {
+      return null;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index b7ff4eb..e27efc0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -1,10 +1,19 @@
 package org.apache.beam.runners.jstorm;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import avro.shaded.com.google.common.collect.Maps;
 import com.alibaba.jstorm.common.metric.AsmMetric;
-import com.alibaba.jstorm.metric.*;
+import com.alibaba.jstorm.metric.AsmMetricRegistry;
+import com.alibaba.jstorm.metric.AsmWindow;
+import com.alibaba.jstorm.metric.JStormMetrics;
+import com.alibaba.jstorm.metric.MetaType;
+import com.alibaba.jstorm.metric.MetricType;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -12,109 +21,106 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
 /**
  * Test JStorm runner.
  */
 public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
-
-    public static TestJStormRunner fromOptions(PipelineOptions options) {
-        return new TestJStormRunner(options.as(JStormPipelineOptions.class));
+  private static final Logger LOG = LoggerFactory.getLogger(TestJStormRunner.class);
+
+  public static TestJStormRunner fromOptions(PipelineOptions options) {
+    return new TestJStormRunner(options.as(JStormPipelineOptions.class));
+  }
+
+  private final JStormRunner stormRunner;
+  private final JStormPipelineOptions options;
+
+  private TestJStormRunner(JStormPipelineOptions options) {
+    this.options = options;
+    Map conf = Maps.newHashMap();
+    //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+    options.setTopologyConfig(conf);
+    options.setLocalMode(true);
+    stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
+  }
+
+  @Override
+  public JStormRunnerResult run(Pipeline pipeline) {
+    JStormRunnerResult result = stormRunner.run(pipeline);
+
+    try {
+      int numberOfAssertions = PAssert.countAsserts(pipeline);
+
+      LOG.info("Running JStorm job {} with {} expected assertions.",
+               result.getTopologyName(), numberOfAssertions);
+      if (numberOfAssertions == 0) {
+        // If assert number is zero, wait 5 sec
+        JStormUtils.sleepMs(5000);
+        return result;
+      } else {
+        for (int i = 0; i < 40; ++i) {
+          Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
+          if (success.isPresent() && success.get()) {
+            return result;
+          } else if (success.isPresent() && !success.get()) {
+            throw new AssertionError("Failed assertion checks.");
+          } else {
+            JStormUtils.sleepMs(500);
+          }
+        }
+        LOG.info("Assertion checks timed out.");
+        throw new AssertionError("Assertion checks timed out.");
+      }
+    } finally {
+      clearPAssertCount();
+      cancel(result);
     }
+  }
 
-    private final JStormRunner stormRunner;
-    private final JStormPipelineOptions options;
-
-    private TestJStormRunner(JStormPipelineOptions options) {
-        this.options = options;
-        Map conf = Maps.newHashMap();
-        //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
-        options.setTopologyConfig(conf);
-        options.setLocalMode(true);
-        stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
+  private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
+    int successes = 0;
+    for (AsmMetric metric :
+        JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+      successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
     }
-
-    @Override
-    public JStormRunnerResult run(Pipeline pipeline) {
-        JStormRunnerResult result = stormRunner.run(pipeline);
-
-        try {
-            int numberOfAssertions = PAssert.countAsserts(pipeline);
-
-            LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions);
-            if(numberOfAssertions == 0) {
-                // If assert number is zero, wait 5 sec
-                JStormUtils.sleepMs(5000);
-                return result;
-            } else {
-                for (int i = 0; i < 40; ++i) {
-                    Optional<Boolean> success = checkForPAssertSuccess(numberOfAssertions);
-                    if (success.isPresent() && success.get()) {
-                        return result;
-                    } else if (success.isPresent() && !success.get()) {
-                        throw new AssertionError("Failed assertion checks.");
-                    } else {
-                        JStormUtils.sleepMs(500);
-                    }
-                }
-                LOG.info("Assertion checks timed out.");
-                throw new AssertionError("Assertion checks timed out.");
-            }
-        } finally {
-            clearPAssertCount();
-            cancel(result);
-        }
+    int failures = 0;
+    for (AsmMetric metric :
+        JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
+      failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
     }
 
-    private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) {
-        int successes = 0;
-        for (AsmMetric metric : JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
-            successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
-        }
-        int failures = 0;
-        for (AsmMetric metric : JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) {
-            failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue();
-        }
-
-        if (failures > 0) {
-            LOG.info("Found {} success, {} failures out of {} expected assertions.",
-                    successes, failures, expectedNumberOfAssertions);
-            return Optional.of(false);
-        } else if (successes >= expectedNumberOfAssertions) {
-            LOG.info("Found {} success, {} failures out of {} expected assertions.",
-                    successes, failures, expectedNumberOfAssertions);
-            return Optional.of(true);
-        }
-
-        LOG.info("Found {} success, {} failures out of {} expected assertions.",
-                successes, failures, expectedNumberOfAssertions);
-        return Optional.absent();
+    if (failures > 0) {
+      LOG.info("Found {} success, {} failures out of {} expected assertions.",
+               successes, failures, expectedNumberOfAssertions);
+      return Optional.of(false);
+    } else if (successes >= expectedNumberOfAssertions) {
+      LOG.info("Found {} success, {} failures out of {} expected assertions.",
+               successes, failures, expectedNumberOfAssertions);
+      return Optional.of(true);
     }
 
-    private void clearPAssertCount() {
-        String topologyName = options.getJobName();
-        AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
-        Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
-        while (itr.hasNext()) {
-            Map.Entry<String, AsmMetric> metric = itr.next();
-            if (metric.getKey().contains(topologyName)) {
-                itr.remove();
-            }
-        }
+    LOG.info("Found {} success, {} failures out of {} expected assertions.",
+             successes, failures, expectedNumberOfAssertions);
+    return Optional.absent();
+  }
+
+  private void clearPAssertCount() {
+    String topologyName = options.getJobName();
+    AsmMetricRegistry taskMetrics = JStormMetrics.getTaskMetrics();
+    Iterator<Map.Entry<String, AsmMetric>> itr = taskMetrics.getMetrics().entrySet().iterator();
+    while (itr.hasNext()) {
+      Map.Entry<String, AsmMetric> metric = itr.next();
+      if (metric.getKey().contains(topologyName)) {
+        itr.remove();
+      }
     }
+  }
 
-    private void cancel(JStormRunnerResult result) {
-        try {
-            result.cancel();
-        } catch (IOException e) {
-            throw new RuntimeException("Failed to cancel.", e);
-}
+  private void cancel(JStormRunnerResult result) {
+    try {
+      result.cancel();
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to cancel.", e);
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
index aa7d325..fa4eeb6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -1,92 +1,108 @@
 package org.apache.beam.runners.jstorm.serialization;
 
 import backtype.storm.Config;
-import org.apache.beam.runners.jstorm.util.RunnerUtils;
 import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
 import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.*;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Table;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
 
 public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
 
-    private static final boolean DOES_NOT_ACCEPT_NULL = false;
-    private static final boolean IMMUTABLE = true;
+  private static final boolean DOES_NOT_ACCEPT_NULL = false;
+  private static final boolean IMMUTABLE = true;
 
-    public ImmutableListSerializer() {
-        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-    }
+  public ImmutableListSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
-        output.writeInt(object.size(), true);
-        for (Object elm : object) {
-            kryo.writeClassAndObject(output, elm);
-        }
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+    output.writeInt(object.size(), true);
+    for (Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
     }
+  }
 
-    @Override
-    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
-        final int size = input.readInt(true);
-        final Object[] list = new Object[size];
-        for (int i = 0; i < size; ++i) {
-            list[i] = kryo.readClassAndObject(input);
-        }
-        return ImmutableList.copyOf(list);
+  @Override
+  public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+    final int size = input.readInt(true);
+    final Object[] list = new Object[size];
+    for (int i = 0; i < size; ++i) {
+      list[i] = kryo.readClassAndObject(input);
     }
+    return ImmutableList.copyOf(list);
+  }
 
-    /**
-     * Creates a new {@link ImmutableListSerializer} and registers its serializer
-     * for the several ImmutableList related classes.
-     */
-    public static void registerSerializers(Config config) {
+  /**
+   * Creates a new {@link ImmutableListSerializer} and registers its serializer
+   * for the several ImmutableList related classes.
+   */
+  public static void registerSerializers(Config config) {
 
-        // ImmutableList (abstract class)
-        //  +- RegularImmutableList
-        //  |   RegularImmutableList
-        //  +- SingletonImmutableList
-        //  |   Optimized for List with only 1 element.
-        //  +- SubList
-        //  |   Representation for part of ImmutableList
-        //  +- ReverseImmutableList
-        //  |   For iterating in reverse order
-        //  +- StringAsImmutableList
-        //  |   Used by Lists#charactersOf
-        //  +- Values (ImmutableTable values)
-        //      Used by return value of #values() when there are multiple cells
+    // ImmutableList (abstract class)
+    //  +- RegularImmutableList
+    //  |   RegularImmutableList
+    //  +- SingletonImmutableList
+    //  |   Optimized for List with only 1 element.
+    //  +- SubList
+    //  |   Representation for part of ImmutableList
+    //  +- ReverseImmutableList
+    //  |   For iterating in reverse order
+    //  +- StringAsImmutableList
+    //  |   Used by Lists#charactersOf
+    //  +- Values (ImmutableTable values)
+    //      Used by return value of #values() when there are multiple cells
 
-        config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
+    config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
 
-        // Note:
-        //  Only registering above is good enough for serializing/deserializing.
-        //  but if using Kryo#copy, following is required.
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
 
-        config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class);
-        config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class);
-        config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class);
-        config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class);
+    config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()),
+        ImmutableListSerializer.class);
+    config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1, 2, 3).subList(1, 2).getClass()),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of().reverse().getClass(),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()),
+        ImmutableListSerializer.class);
 
-        config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class);
+    config.registerSerialization(
+        Lists.charactersOf("KryoRocks").getClass(),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()),
+        ImmutableListSerializer.class);
 
-        Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
-        baseTable.put(1, 2, 3);
-        baseTable.put(4, 5, 6);
-        Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
-        config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
-        config.registerSerialization(
-                RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class);
+    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+    baseTable.put(1, 2, 3);
+    baseTable.put(4, 5, 6);
+    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+    config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(
+        RunnerUtils.getBeamSdkRepackClass(table.values().getClass()),
+        ImmutableListSerializer.class);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
index ee8b765..77eede3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -7,55 +7,61 @@ import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.Map;
 
 public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
 
-    private static final boolean DOES_NOT_ACCEPT_NULL = true;
-    private static final boolean IMMUTABLE = true;
+  private static final boolean DOES_NOT_ACCEPT_NULL = true;
+  private static final boolean IMMUTABLE = true;
 
-    public ImmutableMapSerializer() {
-        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-    }
+  public ImmutableMapSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
-        kryo.writeObject(output, Maps.newHashMap(immutableMap));
-    }
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+    kryo.writeObject(output, Maps.newHashMap(immutableMap));
+  }
 
-    @Override
-    public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) {
-        Map map = kryo.readObject(input, HashMap.class);
-        return ImmutableMap.copyOf(map);
-    }
+  @Override
+  public ImmutableMap<Object, Object> read(
+      Kryo kryo,
+      Input input,
+      Class<ImmutableMap<Object, ? extends Object>> type) {
+    Map map = kryo.readObject(input, HashMap.class);
+    return ImmutableMap.copyOf(map);
+  }
 
-    /**
-     * Creates a new {@link ImmutableMapSerializer} and registers its serializer
-     * for the several ImmutableMap related classes.
-     */
-    public static void registerSerializers(Config config) {
+  /**
+   * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+   * for the several ImmutableMap related classes.
+   */
+  public static void registerSerializers(Config config) {
 
-        config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
-        config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+    config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+    config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
 
-        Object o1 = new Object();
-        Object o2 = new Object();
+    Object o1 = new Object();
+    Object o2 = new Object();
 
-        config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
-        config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class);
-        Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
-        for (DummyEnum e : DummyEnum.values()) {
-            enumMap.put(e, o1);
-        }
-
-        config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class);
+    config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+    config.registerSerialization(
+        ImmutableMap.of(o1, o1, o2, o2).getClass(),
+        ImmutableMapSerializer.class);
+    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+    for (DummyEnum e : DummyEnum.values()) {
+      enumMap.put(e, o1);
     }
 
-    private enum DummyEnum {
-        VALUE1,
-        VALUE2
-    }
+    config.registerSerialization(
+        ImmutableMap.copyOf(enumMap).getClass(),
+        ImmutableMapSerializer.class);
+  }
+
+  private enum DummyEnum {
+    VALUE1,
+    VALUE2
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
index cdc4382..3a43b2b 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -10,62 +10,63 @@ import com.google.common.collect.Sets;
 
 public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
 
-    private static final boolean DOES_NOT_ACCEPT_NULL = false;
-    private static final boolean IMMUTABLE = true;
+  private static final boolean DOES_NOT_ACCEPT_NULL = false;
+  private static final boolean IMMUTABLE = true;
 
-    public ImmutableSetSerializer() {
-        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-    }
+  public ImmutableSetSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
-        output.writeInt(object.size(), true);
-        for (Object elm : object) {
-            kryo.writeClassAndObject(output, elm);
-        }
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+    output.writeInt(object.size(), true);
+    for (Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
     }
+  }
 
-    @Override
-    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
-        final int size = input.readInt(true);
-        ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
-        for (int i = 0; i < size; ++i) {
-            builder.add(kryo.readClassAndObject(input));
-        }
-        return builder.build();
+  @Override
+  public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+    final int size = input.readInt(true);
+    ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+    for (int i = 0; i < size; ++i) {
+      builder.add(kryo.readClassAndObject(input));
     }
+    return builder.build();
+  }
 
-    /**
-     * Creates a new {@link ImmutableSetSerializer} and registers its serializer
-     * for the several ImmutableSet related classes.
-     */
-    public static void registerSerializers(Config config) {
+  /**
+   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+   * for the several ImmutableSet related classes.
+   */
+  public static void registerSerializers(Config config) {
 
-        // ImmutableList (abstract class)
-        //  +- EmptyImmutableSet
-        //  |   EmptyImmutableSet
-        //  +- SingletonImmutableSet
-        //  |   Optimized for Set with only 1 element.
-        //  +- RegularImmutableSet
-        //  |   RegularImmutableList
-        //  +- EnumImmutableSet
-        //  |   EnumImmutableSet
+    // ImmutableList (abstract class)
+    //  +- EmptyImmutableSet
+    //  |   EmptyImmutableSet
+    //  +- SingletonImmutableSet
+    //  |   Optimized for Set with only 1 element.
+    //  +- RegularImmutableSet
+    //  |   RegularImmutableList
+    //  +- EnumImmutableSet
+    //  |   EnumImmutableSet
 
-        config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
 
-        // Note:
-        //  Only registering above is good enough for serializing/deserializing.
-        //  but if using Kryo#copy, following is required.
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
 
-        config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
-        config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
-        config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class);
 
-        config.registerSerialization(
-                Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class);
-    }
+    config.registerSerialization(
+        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+        ImmutableSetSerializer.class);
+  }
 
-    private enum SomeEnum {
-        A, B, C
-    }
+  private enum SomeEnum {
+    A, B, C
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
index decfb3f..b47f3b7 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -6,50 +6,49 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.Lists;
-
 import java.util.Iterator;
 import java.util.List;
 
 public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
 
-    public KvStoreIterableSerializer() {
+  public KvStoreIterableSerializer() {
 
-    }
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
-        List<Object> values = Lists.newArrayList(object);
-        output.writeInt(values.size(), true);
-        for (Object elm : object) {
-            kryo.writeClassAndObject(output, elm);
-        }
+  @Override
+  public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
+    List<Object> values = Lists.newArrayList(object);
+    output.writeInt(values.size(), true);
+    for (Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
     }
-
-    @Override
-    public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
-        final int size = input.readInt(true);
-        List<Object> values = Lists.newArrayList();
-        for (int i = 0; i < size; ++i) {
-            values.add(kryo.readClassAndObject(input));
-        }
-
-        return new KvStoreIterable<Object>() {
-            Iterable<Object> values;
-
-            @Override
-            public Iterator<Object> iterator() {
-                return values.iterator();
-            }
-
-            public KvStoreIterable init(Iterable<Object> values) {
-                this.values = values;
-                return this;
-            }
-
-            @Override
-            public String toString() {
-                return values.toString();
-            }
-        }.init(values);
+  }
+
+  @Override
+  public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
+    final int size = input.readInt(true);
+    List<Object> values = Lists.newArrayList();
+    for (int i = 0; i < size; ++i) {
+      values.add(kryo.readClassAndObject(input));
     }
+
+    return new KvStoreIterable<Object>() {
+      Iterable<Object> values;
+
+      @Override
+      public Iterator<Object> iterator() {
+        return values.iterator();
+      }
+
+      public KvStoreIterable init(Iterable<Object> values) {
+        this.values = values;
+        return this;
+      }
+
+      @Override
+      public String toString() {
+        return values.toString();
+      }
+    }.init(values);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
index 9bb315b..dd4272c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -6,73 +6,83 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
 import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
 
 public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
 
-    private static final boolean DOES_NOT_ACCEPT_NULL = false;
-    private static final boolean IMMUTABLE = true;
+  private static final boolean DOES_NOT_ACCEPT_NULL = false;
+  private static final boolean IMMUTABLE = true;
 
-    public SdkRepackImmuListSerializer() {
-        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-    }
+  public SdkRepackImmuListSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
-        output.writeInt(object.size(), true);
-        for (Object elm : object) {
-            kryo.writeClassAndObject(output, elm);
-        }
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+    output.writeInt(object.size(), true);
+    for (Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
     }
+  }
 
-    @Override
-    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
-        final int size = input.readInt(true);
-        final Object[] list = new Object[size];
-        for (int i = 0; i < size; ++i) {
-            list[i] = kryo.readClassAndObject(input);
-        }
-        return ImmutableList.copyOf(list);
+  @Override
+  public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+    final int size = input.readInt(true);
+    final Object[] list = new Object[size];
+    for (int i = 0; i < size; ++i) {
+      list[i] = kryo.readClassAndObject(input);
     }
+    return ImmutableList.copyOf(list);
+  }
 
-    /**
-     * Creates a new {@link ImmutableListSerializer} and registers its serializer
-     * for the several ImmutableList related classes.
-     */
-    public static void registerSerializers(Config config) {
+  /**
+   * Creates a new {@link ImmutableListSerializer} and registers its serializer
+   * for the several ImmutableList related classes.
+   */
+  public static void registerSerializers(Config config) {
 
-        // ImmutableList (abstract class)
-        //  +- RegularImmutableList
-        //  |   RegularImmutableList
-        //  +- SingletonImmutableList
-        //  |   Optimized for List with only 1 element.
-        //  +- SubList
-        //  |   Representation for part of ImmutableList
-        //  +- ReverseImmutableList
-        //  |   For iterating in reverse order
-        //  +- StringAsImmutableList
-        //  |   Used by Lists#charactersOf
-        //  +- Values (ImmutableTable values)
-        //      Used by return value of #values() when there are multiple cells
+    // ImmutableList (abstract class)
+    //  +- RegularImmutableList
+    //  |   RegularImmutableList
+    //  +- SingletonImmutableList
+    //  |   Optimized for List with only 1 element.
+    //  +- SubList
+    //  |   Representation for part of ImmutableList
+    //  +- ReverseImmutableList
+    //  |   For iterating in reverse order
+    //  +- StringAsImmutableList
+    //  |   Used by Lists#charactersOf
+    //  +- Values (ImmutableTable values)
+    //      Used by return value of #values() when there are multiple cells
 
-        config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
+    config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
 
-        // Note:
-        //  Only registering above is good enough for serializing/deserializing.
-        //  but if using Kryo#copy, following is required.
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
 
-        config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
-        config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
-        config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class);
-        config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class);
+    config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
+    config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+        SdkRepackImmuListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of().reverse().getClass(),
+        SdkRepackImmuListSerializer.class);
 
-        config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class);
+    config.registerSerialization(
+        Lists.charactersOf("KryoRocks").getClass(),
+        SdkRepackImmuListSerializer.class);
 
-        Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
-        baseTable.put(1, 2, 3);
-        baseTable.put(4, 5, 6);
-        Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
-        config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
+    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+    baseTable.put(1, 2, 3);
+    baseTable.put(4, 5, 6);
+    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+    config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
index a514645..6973c82 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -5,67 +5,71 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
 import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
 
 public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
 
-    private static final boolean DOES_NOT_ACCEPT_NULL = false;
-    private static final boolean IMMUTABLE = true;
+  private static final boolean DOES_NOT_ACCEPT_NULL = false;
+  private static final boolean IMMUTABLE = true;
 
-    public SdkRepackImmuSetSerializer() {
-        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-    }
+  public SdkRepackImmuSetSerializer() {
+    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+  }
 
-    @Override
-    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
-        output.writeInt(object.size(), true);
-        for (Object elm : object) {
-            kryo.writeClassAndObject(output, elm);
-        }
+  @Override
+  public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+    output.writeInt(object.size(), true);
+    for (Object elm : object) {
+      kryo.writeClassAndObject(output, elm);
     }
+  }
 
-    @Override
-    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
-        final int size = input.readInt(true);
-        ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
-        for (int i = 0; i < size; ++i) {
-            builder.add(kryo.readClassAndObject(input));
-        }
-        return builder.build();
+  @Override
+  public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+    final int size = input.readInt(true);
+    ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+    for (int i = 0; i < size; ++i) {
+      builder.add(kryo.readClassAndObject(input));
     }
+    return builder.build();
+  }
 
-    /**
-     * Creates a new {@link ImmutableSetSerializer} and registers its serializer
-     * for the several ImmutableSet related classes.
-     */
-    public static void registerSerializers(Config config) {
+  /**
+   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+   * for the several ImmutableSet related classes.
+   */
+  public static void registerSerializers(Config config) {
 
-        // ImmutableList (abstract class)
-        //  +- EmptyImmutableSet
-        //  |   EmptyImmutableSet
-        //  +- SingletonImmutableSet
-        //  |   Optimized for Set with only 1 element.
-        //  +- RegularImmutableSet
-        //  |   RegularImmutableList
-        //  +- EnumImmutableSet
-        //  |   EnumImmutableSet
+    // ImmutableList (abstract class)
+    //  +- EmptyImmutableSet
+    //  |   EmptyImmutableSet
+    //  +- SingletonImmutableSet
+    //  |   Optimized for Set with only 1 element.
+    //  +- RegularImmutableSet
+    //  |   RegularImmutableList
+    //  +- EnumImmutableSet
+    //  |   EnumImmutableSet
 
-        config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
+    config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
 
-        // Note:
-        //  Only registering above is good enough for serializing/deserializing.
-        //  but if using Kryo#copy, following is required.
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
 
-        config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
-        config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
-        config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
+    config.registerSerialization(
+        ImmutableSet.of(1, 2, 3).getClass(),
+        SdkRepackImmuSetSerializer.class);
 
-        config.registerSerialization(
-                Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class);
-    }
+    config.registerSerialization(
+        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+        SdkRepackImmuSetSerializer.class);
+  }
 
-    private enum SomeEnum {
-        A, B, C
-    }
+  private enum SomeEnum {
+    A, B, C
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/aa251a4a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
index c8b0138..bcee778 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -5,155 +5,177 @@ import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
 import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
 import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-
 import java.lang.reflect.Field;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
 
 public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
 
-    private static final Field SOURCE_COLLECTION_FIELD;
-    private static final Field SOURCE_MAP_FIELD;
+  private static final Field SOURCE_COLLECTION_FIELD;
+  private static final Field SOURCE_MAP_FIELD;
 
-    static {
-        try {
-            SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" )
-                    .getDeclaredField( "c" );
-            SOURCE_COLLECTION_FIELD.setAccessible( true );
+  static {
+    try {
+      SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection")
+          .getDeclaredField("c");
+      SOURCE_COLLECTION_FIELD.setAccessible(true);
 
 
-            SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" )
-                    .getDeclaredField( "m" );
-            SOURCE_MAP_FIELD.setAccessible( true );
-        } catch ( final Exception e ) {
-            throw new RuntimeException( "Could not access source collection" +
-                    " field in java.util.Collections$UnmodifiableCollection.", e );
-        }
+      SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap")
+          .getDeclaredField("m");
+      SOURCE_MAP_FIELD.setAccessible(true);
+    } catch (final Exception e) {
+      throw new RuntimeException("Could not access source collection"
+          + " field in java.util.Collections$UnmodifiableCollection.", e);
     }
-
-    @Override
-    public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
-        final int ordinal = input.readInt( true );
-        final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
-        final Object sourceCollection = kryo.readClassAndObject( input );
-        return unmodifiableCollection.create( sourceCollection );
+  }
+
+  @Override
+  public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
+    final int ordinal = input.readInt(true);
+    final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
+    final Object sourceCollection = kryo.readClassAndObject(input);
+    return unmodifiableCollection.create(sourceCollection);
+  }
+
+  @Override
+  public void write(final Kryo kryo, final Output output, final Object object) {
+    try {
+      final UnmodifiableCollection unmodifiableCollection =
+          UnmodifiableCollection.valueOfType(object.getClass());
+      // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
+      output.writeInt(unmodifiableCollection.ordinal(), true);
+      kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object));
+    } catch (final RuntimeException e) {
+      // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+      // handles SerializationException specifically (resizing the buffer)...
+      throw e;
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
     }
-
-    @Override
-    public void write(final Kryo kryo, final Output output, final Object object) {
-        try {
-            final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() );
-            // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
-            output.writeInt( unmodifiableCollection.ordinal(), true );
-            kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) );
-        } catch ( final RuntimeException e ) {
-            // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
-            // handles SerializationException specifically (resizing the buffer)...
-            throw e;
-        } catch ( final Exception e ) {
-            throw new RuntimeException( e );
-        }
+  }
+
+  @Override
+  public Object copy(Kryo kryo, Object original) {
+    try {
+      final UnmodifiableCollection unmodifiableCollection =
+          UnmodifiableCollection.valueOfType(original.getClass());
+      Object sourceCollectionCopy =
+          kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
+      return unmodifiableCollection.create(sourceCollectionCopy);
+    } catch (final RuntimeException e) {
+      // Don't eat and wrap RuntimeExceptions
+      throw e;
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
     }
-
-    @Override
-    public Object copy(Kryo kryo, Object original) {
-        try {
-            final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() );
-            Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
-            return unmodifiableCollection.create( sourceCollectionCopy );
-        } catch ( final RuntimeException e ) {
-            // Don't eat and wrap RuntimeExceptions
-            throw e;
-        } catch ( final Exception e ) {
-            throw new RuntimeException( e );
-        }
+  }
+
+  private static enum UnmodifiableCollection {
+    COLLECTION(
+        Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableCollection((Collection<?>) sourceCollection);
+      }
+    },
+    RANDOM_ACCESS_LIST(
+        Collections.unmodifiableList(new ArrayList<Void>()).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableList((List<?>) sourceCollection);
+      }
+    },
+    LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableList((List<?>) sourceCollection);
+      }
+    },
+    SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSet((Set<?>) sourceCollection);
+      }
+    },
+    SORTED_SET(
+        Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection);
+      }
+    },
+    MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableMap((Map<?, ?>) sourceCollection);
+      }
+
+    },
+    SORTED_MAP(
+        Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(),
+        SOURCE_MAP_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection);
+      }
+    };
+
+    private final Class<?> type;
+    private final Field sourceCollectionField;
+
+    private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) {
+      this.type = type;
+      this.sourceCollectionField = sourceCollectionField;
     }
 
-    private static enum UnmodifiableCollection {
-        COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableCollection( (Collection<?>) sourceCollection );
-            }
-        },
-        RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableList( (List<?>) sourceCollection );
-            }
-        },
-        LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableList( (List<?>) sourceCollection );
-            }
-        },
-        SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableSet( (Set<?>) sourceCollection );
-            }
-        },
-        SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection );
-            }
-        },
-        MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
-
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection );
-            }
-
-        },
-        SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
-            @Override
-            public Object create( final Object sourceCollection ) {
-                return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection );
-            }
-        };
-
-        private final Class<?> type;
-        private final Field sourceCollectionField;
-
-        private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) {
-            this.type = type;
-            this.sourceCollectionField = sourceCollectionField;
-        }
+    /**
+     * @param sourceCollection
+     */
+    public abstract Object create(Object sourceCollection);
 
-        /**
-         * @param sourceCollection
-         */
-        public abstract Object create( Object sourceCollection );
-
-        static UnmodifiableCollection valueOfType(final Class<?> type ) {
-            for( final UnmodifiableCollection item : values() ) {
-                if ( item.type.equals( type ) ) {
-                    return item;
-                }
-            }
-            throw new IllegalArgumentException( "The type " + type + " is not supported." );
+    static UnmodifiableCollection valueOfType(final Class<?> type) {
+      for (final UnmodifiableCollection item : values()) {
+        if (item.type.equals(type)) {
+          return item;
         }
-
+      }
+      throw new IllegalArgumentException("The type " + type + " is not supported.");
     }
 
-    /**
-     * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
-     * for the several unmodifiable Collections that can be created via {@link Collections},
-     * including {@link Map}s.
-     *
-     * @see Collections#unmodifiableCollection(Collection)
-     * @see Collections#unmodifiableList(List)
-     * @see Collections#unmodifiableSet(Set)
-     * @see Collections#unmodifiableSortedSet(SortedSet)
-     * @see Collections#unmodifiableMap(Map)
-     * @see Collections#unmodifiableSortedMap(SortedMap)
-     */
-    public static void registerSerializers( Config config ) {
-        UnmodifiableCollection.values();
-        for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) {
-            config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class );
-        }
+  }
+
+  /**
+   * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
+   * for the several unmodifiable Collections that can be created via {@link Collections},
+   * including {@link Map}s.
+   *
+   * @see Collections#unmodifiableCollection(Collection)
+   * @see Collections#unmodifiableList(List)
+   * @see Collections#unmodifiableSet(Set)
+   * @see Collections#unmodifiableSortedSet(SortedSet)
+   * @see Collections#unmodifiableMap(Map)
+   * @see Collections#unmodifiableSortedMap(SortedMap)
+   */
+  public static void registerSerializers(Config config) {
+    UnmodifiableCollection.values();
+    for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
+      config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
     }
+  }
 }