You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:47 UTC

[47/53] [abbrv] beam git commit: jstorm-runner: 1. Generate execution DAG for runtime 2. Restructure Kryo serializers

jstorm-runner:
1. Generate execution DAG for runtime
2. Restructure Kryo serializers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6078cbc6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6078cbc6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6078cbc6

Branch: refs/heads/jstorm-runner
Commit: 6078cbc6bd5ca6e48e237c652c532b189acef2b7
Parents: 240f61b
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Aug 9 16:48:42 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:00 2017 +0800

----------------------------------------------------------------------
 runners/jstorm/pom.xml                          |   8 +-
 .../beam/runners/jstorm/JStormRunner.java       | 169 +++++++++++--
 .../BeamSdkRepackUtilsSerializer.java           | 253 +++++++++++++++++++
 .../serialization/BeamUtilsSerializer.java      | 114 +++++++++
 .../serialization/CollectionsSerializer.java    |  60 -----
 .../serialization/GuavaUtilsSerializer.java     | 252 ++++++++++++++++++
 .../serialization/ImmutableListSerializer.java  | 106 --------
 .../serialization/ImmutableMapSerializer.java   |  87 -------
 .../serialization/ImmutableSetSerializer.java   |  92 -------
 .../serialization/JStormUtilsSerializer.java    | 126 +++++++++
 .../serialization/JavaUtilsSerializer.java      | 236 +++++++++++++++++
 .../KvStoreIterableSerializer.java              |  74 ------
 .../SdkRepackImmuListSerializer.java            | 107 --------
 .../SdkRepackImmuSetSerializer.java             |  95 -------
 .../SdkRepackImmutableMapSerializer.java        |  90 -------
 .../UnmodifiableCollectionsSerializer.java      | 201 ---------------
 .../translation/BoundedSourceTranslator.java    |   1 +
 .../jstorm/translation/DoFnExecutor.java        |   2 +-
 .../runners/jstorm/translation/Executor.java    |   2 +-
 .../jstorm/translation/ExecutorsBolt.java       |  35 ++-
 .../jstorm/translation/FlattenTranslator.java   |   1 +
 .../translation/JStormStateInternals.java       |  24 +-
 .../jstorm/translation/TranslationContext.java  |  19 +-
 .../translation/UnboundedSourceSpout.java       |  12 +
 .../translation/UnboundedSourceTranslator.java  |   1 +
 25 files changed, 1203 insertions(+), 964 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/pom.xml
----------------------------------------------------------------------
diff --git a/runners/jstorm/pom.xml b/runners/jstorm/pom.xml
index 79634e9..75387ef 100644
--- a/runners/jstorm/pom.xml
+++ b/runners/jstorm/pom.xml
@@ -53,9 +53,6 @@
                   <goal>test</goal>
                 </goals>
                 <configuration>
-                  <!--<includes>
-                    <include>org.apache.beam.sdk.transforms.CombineTest.java</include>
-                  </includes>-->
                   <groups>
                     org.apache.beam.sdk.testing.ValidatesRunner
                   </groups>
@@ -144,6 +141,11 @@
         <groupId>com.google.auto.value</groupId>
         <artifactId>auto-value</artifactId>
     </dependency>
+    <dependency>
+        <groupId>com.googlecode.json-simple</groupId>
+        <artifactId>json-simple</artifactId>
+        <version>1.1</version>
+    </dependency>
 
     <!-- Depend on test jar to scan for ValidatesRunner tests -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 56db1c6..47de42c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -26,22 +26,25 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
-import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.transactional.TransactionTopologyBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.runners.jstorm.serialization.CollectionsSerializer;
-import org.apache.beam.runners.jstorm.serialization.ImmutableListSerializer;
-import org.apache.beam.runners.jstorm.serialization.ImmutableMapSerializer;
-import org.apache.beam.runners.jstorm.serialization.ImmutableSetSerializer;
-import org.apache.beam.runners.jstorm.serialization.KvStoreIterableSerializer;
-import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuListSerializer;
-import org.apache.beam.runners.jstorm.serialization.SdkRepackImmuSetSerializer;
-import org.apache.beam.runners.jstorm.serialization.SdkRepackImmutableMapSerializer;
-import org.apache.beam.runners.jstorm.serialization.UnmodifiableCollectionsSerializer;
+
+import org.apache.beam.runners.jstorm.serialization.BeamSdkRepackUtilsSerializer;
+import org.apache.beam.runners.jstorm.serialization.BeamUtilsSerializer;
+import org.apache.beam.runners.jstorm.serialization.GuavaUtilsSerializer;
+import org.apache.beam.runners.jstorm.serialization.JStormUtilsSerializer;
+import org.apache.beam.runners.jstorm.serialization.JavaUtilsSerializer;
 import org.apache.beam.runners.jstorm.translation.AbstractComponent;
 import org.apache.beam.runners.jstorm.translation.CommonInstance;
+import org.apache.beam.runners.jstorm.translation.Executor;
 import org.apache.beam.runners.jstorm.translation.ExecutorsBolt;
 import org.apache.beam.runners.jstorm.translation.JStormPipelineTranslator;
 import org.apache.beam.runners.jstorm.translation.Stream;
@@ -53,6 +56,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,17 +105,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
     config.put("worker.external", "beam");
     config.put("topology.acker.executors", 0);
 
-    UnmodifiableCollectionsSerializer.registerSerializers(config);
-    // register classes of guava utils, ImmutableList, ImmutableSet, ImmutableMap
-    ImmutableListSerializer.registerSerializers(config);
-    SdkRepackImmuListSerializer.registerSerializers(config);
-    ImmutableSetSerializer.registerSerializers(config);
-    SdkRepackImmuSetSerializer.registerSerializers(config);
-    ImmutableMapSerializer.registerSerializers(config);
-    SdkRepackImmutableMapSerializer.registerSerializers(config);
-    CollectionsSerializer.registerSerializers(config);
-
-    config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+    // Register serializers of Kryo
+    GuavaUtilsSerializer.registerSerializers(config);
+    BeamUtilsSerializer.registerSerializers(config);
+    BeamSdkRepackUtilsSerializer.registerSerializers(config);
+    JStormUtilsSerializer.registerSerializers(config);
+    JavaUtilsSerializer.registerSerializers(config);
     return config;
   }
 
@@ -128,6 +130,8 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
 
     String topologyName = options.getJobName();
     Config config = convertPipelineOptionsToConfig(options);
+    ConfigExtension.setTopologyComponentSubgraphDefinition(
+        config, getSubGraphDefintions(context));
 
     return runTopology(
         topologyName,
@@ -135,6 +139,129 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
         config);
   }
 
+  private JSONObject buildNode(String name, String type) {
+    // Node: {name:name, type:tag/transform}
+    JSONObject jsonNode = new JSONObject();
+    jsonNode.put("name", name);
+    jsonNode.put("type", type);
+    return jsonNode;
+  }
+
+  private JSONArray buildEdge(Integer sourceId, Integer targetId) {
+    JSONArray edge = new JSONArray();
+    edge.addAll(Lists.newArrayList(sourceId, targetId));
+    return edge;
+  }
+
+  private String getPValueName(TranslationContext.UserGraphContext userGraphContext,
+                               TupleTag tupleTag) {
+    PValue pValue = userGraphContext.findPValue(tupleTag);
+    int index = pValue.getName().lastIndexOf("/");
+    return pValue.getName().substring(index + 1);
+  }
+
+  private String getSubGraphDefintions(TranslationContext context) {
+    TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
+    TranslationContext.ExecutionGraphContext executionGraphContext =
+        context.getExecutionGraphContext();
+    JSONObject graph = new JSONObject();
+
+    // Get sub-graphs for spouts
+    for (Map.Entry<String, UnboundedSourceSpout> entry :
+        executionGraphContext.getSpouts().entrySet()) {
+      JSONObject subGraph = new JSONObject();
+
+      // Nodes
+      JSONObject nodes = new JSONObject();
+      nodes.put(1, buildNode(entry.getValue().getName(), "transform"));
+      nodes.put(2, buildNode(
+          getPValueName(userGraphContext, entry.getValue().getOutputTag()), "tag"));
+      subGraph.put("nodes", nodes);
+
+      // Edges
+      JSONArray edges = new JSONArray();
+      edges.add(buildEdge(1, 2));
+      subGraph.put("edges", edges);
+
+      graph.put(entry.getKey(), subGraph);
+    }
+
+    // Get sub-graphs for bolts
+    for (Map.Entry<String, ExecutorsBolt> entry : executionGraphContext.getBolts().entrySet()) {
+      ExecutorsBolt executorsBolt = entry.getValue();
+      Map<Executor, String> executorNames = executorsBolt.getExecutorNames();
+      Map<TupleTag, Executor> inputTagToExecutors = executorsBolt.getExecutors();
+
+      // Sub-Graph
+      JSONObject subGraph = new JSONObject();
+
+      // Nodes
+      JSONObject nodes = new JSONObject();
+      Map<String, Integer> nodeNameToId = Maps.newHashMap();
+      int id = 1;
+      for (Map.Entry<Executor, Collection<TupleTag>> entry1 :
+          executorsBolt.getExecutorToOutputTags().entrySet()) {
+        Executor executor = entry1.getKey();
+        nodes.put(id, buildNode(executorNames.get(executor), "transform"));
+        nodeNameToId.put(executorNames.get(executor), id);
+        id++;
+      }
+      subGraph.put("nodes", nodes);
+
+      Collection<TupleTag> externalOutputTags = executorsBolt.getExternalOutputTags();
+      for (TupleTag outputTag : externalOutputTags) {
+        String name = getPValueName(userGraphContext, outputTag);
+        nodes.put(id, buildNode(name, "tag"));
+        nodeNameToId.put(outputTag.getId(), id);
+        id++;
+      }
+
+      Collection<TupleTag> externalInputTags = Sets.newHashSet(inputTagToExecutors.keySet());
+      externalInputTags.removeAll(executorsBolt.getOutputTags());
+      for (TupleTag inputTag : externalInputTags) {
+        String name = getPValueName(userGraphContext, inputTag);
+        nodes.put(id, buildNode(name, "tag"));
+        nodeNameToId.put(inputTag.getId(), id);
+        id++;
+      }
+
+      // Edges
+      JSONArray edges = new JSONArray();
+      for (Map.Entry<Executor, Collection<TupleTag>> entry1 :
+          executorsBolt.getExecutorToOutputTags().entrySet()) {
+        Executor sourceExecutor = entry1.getKey();
+        Collection<TupleTag> outputTags = entry1.getValue();
+        for (TupleTag tag : outputTags) {
+          if (inputTagToExecutors.containsKey(tag)) {
+            Executor targetExecutor = inputTagToExecutors.get(tag);
+            if (executorNames.containsKey(targetExecutor)) {
+              edges.add(buildEdge(nodeNameToId.get(executorNames.get(sourceExecutor)),
+                  nodeNameToId.get(executorNames.get(targetExecutor))));
+            }
+          }
+          if (externalOutputTags.contains(tag)) {
+            edges.add(buildEdge(nodeNameToId.get(executorNames.get(sourceExecutor)),
+                nodeNameToId.get(tag.getId())));
+          }
+        }
+      }
+      for (TupleTag tag : externalInputTags) {
+        if (inputTagToExecutors.containsKey(tag)) {
+          Executor targetExecutor = inputTagToExecutors.get(tag);
+          if (executorNames.containsKey(targetExecutor)) {
+            edges.add(buildEdge(nodeNameToId.get(tag.getId()),
+                nodeNameToId.get(executorNames.get(targetExecutor))));
+          }
+        }
+      }
+      subGraph.put("edges", edges);
+
+      graph.put(entry.getKey(), subGraph);
+    }
+
+    return graph.toJSONString();
+  }
+
   private JStormRunnerResult runTopology(
       String topologyName,
       StormTopology topology,

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
new file mode 100644
index 0000000..4ae47eb
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamSdkRepackUtilsSerializer.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
+
+/**
+ * Specific serializer of {@link Kryo} for Beam SDK repackaged class,
+ * e.g. ImmutableList, ImmutableMap...
+ */
+public class BeamSdkRepackUtilsSerializer {
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableList.
+   */
+  public static class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableListSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+      output.writeInt(object.size(), true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+      final int size = input.readInt(true);
+      final Object[] list = new Object[size];
+      for (int i = 0; i < size; ++i) {
+        list[i] = kryo.readClassAndObject(input);
+      }
+      return ImmutableList.copyOf(list);
+    }
+  }
+
+  /**
+   * registers its serializer for the several ImmutableList related classes.
+   */
+  private static void registerImmutableListSerializers(Config config) {
+
+    // ImmutableList (abstract class)
+    //  +- RegularImmutableList
+    //  |   RegularImmutableList
+    //  +- SingletonImmutableList
+    //  |   Optimized for List with only 1 element.
+    //  +- SubList
+    //  |   Representation for part of ImmutableList
+    //  +- ReverseImmutableList
+    //  |   For iterating in reverse order
+    //  +- StringAsImmutableList
+    //  |   Used by Lists#charactersOf
+    //  +- Values (ImmutableTable values)
+    //      Used by return value of #values() when there are multiple cells
+
+    config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
+
+    config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of().reverse().getClass(),
+        ImmutableListSerializer.class);
+
+    config.registerSerialization(
+        Lists.charactersOf("KryoRocks").getClass(),
+        ImmutableListSerializer.class);
+
+    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+    baseTable.put(1, 2, 3);
+    baseTable.put(4, 5, 6);
+    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+    config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+  }
+
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableMap.
+   */
+  public static class ImmutableMapSerializer extends
+      Serializer<ImmutableMap<Object, ? extends Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = true;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableMapSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output,
+                      ImmutableMap<Object, ? extends Object> immutableMap) {
+      kryo.writeObject(output, Maps.newHashMap(immutableMap));
+    }
+
+    @Override
+    public ImmutableMap<Object, Object> read(
+        Kryo kryo,
+        Input input,
+        Class<ImmutableMap<Object, ? extends Object>> type) {
+      Map map = kryo.readObject(input, HashMap.class);
+      return ImmutableMap.copyOf(map);
+    }
+  }
+
+  private enum DummyEnum {
+    VALUE1,
+    VALUE2
+  }
+
+  /**
+   * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+   * for the several ImmutableMap related classes.
+   */
+  private static void registerImmutableMapSerializers(Config config) {
+
+    config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+    config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+
+    Object o1 = new Object();
+    Object o2 = new Object();
+
+    config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+    config.registerSerialization(
+        ImmutableMap.of(o1, o1, o2, o2).getClass(),
+        ImmutableMapSerializer.class);
+    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+    for (DummyEnum e : DummyEnum.values()) {
+      enumMap.put(e, o1);
+    }
+
+    config.registerSerialization(
+        ImmutableMap.copyOf(enumMap).getClass(),
+        ImmutableMapSerializer.class);
+  }
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableSet.
+   */
+  public static class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableSetSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+      output.writeInt(object.size(), true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+      final int size = input.readInt(true);
+      ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+      for (int i = 0; i < size; ++i) {
+        builder.add(kryo.readClassAndObject(input));
+      }
+      return builder.build();
+    }
+  }
+
+  private enum SomeEnum {
+    A, B, C
+  }
+
+  /**
+   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+   * for the several ImmutableSet related classes.
+   */
+  private static void registerImmutableSetSerializers(Config config) {
+
+    // ImmutableList (abstract class)
+    //  +- EmptyImmutableSet
+    //  |   EmptyImmutableSet
+    //  +- SingletonImmutableSet
+    //  |   Optimized for Set with only 1 element.
+    //  +- RegularImmutableSet
+    //  |   RegularImmutableList
+    //  +- EnumImmutableSet
+    //  |   EnumImmutableSet
+
+    config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
+
+    config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class);
+
+    config.registerSerialization(
+        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+        ImmutableSetSerializer.class);
+  }
+
+  public static void registerSerializers(Config config) {
+    registerImmutableListSerializers(config);
+    registerImmutableMapSerializers(config);
+    registerImmutableSetSerializers(config);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
new file mode 100644
index 0000000..db1f037
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * Specific serializer of {@link Kryo} for Beam classes.
+ */
+public class BeamUtilsSerializer {
+
+  /**
+   * Serializer for {@link KV}.
+   */
+  public static class KvSerializer extends Serializer<KV> {
+
+    @Override
+    public void write(Kryo kryo, Output output, KV object) {
+      kryo.writeClassAndObject(output, object.getKey());
+      kryo.writeClassAndObject(output, object.getValue());
+    }
+
+    @Override
+    public KV read(Kryo kryo, Input input, Class<KV> type) {
+      return KV.of(kryo.readClassAndObject(input), kryo.readClassAndObject(input));
+    }
+  }
+
+  /**
+   * Serializer for {@link Instant}.
+   */
+  public static class InstantSerializer extends Serializer<Instant> {
+    @Override
+    public void write(Kryo kryo, Output output, Instant object) {
+      output.writeLong(object.getMillis(), true);
+    }
+
+    @Override
+    public Instant read(Kryo kryo, Input input, Class<Instant> type) {
+      return new Instant(input.readLong(true));
+    }
+  }
+
+  /**
+   * Serializer for {@link IntervalWindow}.
+   */
+  public static class IntervalWindowSerializer extends Serializer<IntervalWindow> {
+
+    @Override
+    public void write(Kryo kryo, Output output, IntervalWindow object) {
+      kryo.writeObject(output, object.start());
+      kryo.writeObject(output, object.end());
+    }
+
+    @Override
+    public IntervalWindow read(Kryo kryo, Input input, Class<IntervalWindow> type) {
+      Instant start = kryo.readObject(input, Instant.class);
+      Instant end = kryo.readObject(input, Instant.class);
+      return new IntervalWindow(start, end);
+    }
+  }
+
+  public static void registerSerializers(Config config) {
+    // Register classes with serializers
+    config.registerSerialization(KV.class, KvSerializer.class);
+    config.registerSerialization(IntervalWindow.class, IntervalWindowSerializer.class);
+
+    // Register classes with default serializer
+    config.registerSerialization(PaneInfo.class);
+    config.registerSerialization(StateNamespaces.WindowAndTriggerNamespace.class);
+    config.registerSerialization(StateNamespaces.WindowNamespace.class);
+    config.registerSerialization(StateNamespaces.GlobalNamespace.class);
+    config.registerSerialization(IntervalWindow.IntervalWindowCoder.class);
+    // Register classes of WindowedValue
+    config.registerSerialization(WindowedValue.valueInGlobalWindow(null).getClass());
+    config.registerSerialization(
+        WindowedValue.timestampedValueInGlobalWindow(null, Instant.now()).getClass());
+    config.registerSerialization(WindowedValue.of(null, BoundedWindow.TIMESTAMP_MIN_VALUE,
+        Lists.<BoundedWindow>newArrayList(), PaneInfo.NO_FIRING).getClass());
+    IntervalWindow w1 = new IntervalWindow(new Instant(1), new Instant(2));
+    IntervalWindow w2 = new IntervalWindow(new Instant(2), new Instant(3));
+    config.registerSerialization(WindowedValue.of(null, Instant.now(),
+        Lists.<BoundedWindow>newArrayList(w1), PaneInfo.NO_FIRING).getClass());
+    config.registerSerialization(WindowedValue.of(null, Instant.now(),
+        Lists.<BoundedWindow>newArrayList(w1, w2), PaneInfo.NO_FIRING).getClass());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
deleted file mode 100644
index 1c8053e..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/CollectionsSerializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-
-import java.util.Collections;
-import java.util.List;
-
-
-/**
- * Specific serializer of {@link Kryo} for Collections.
- */
-public class CollectionsSerializer {
-
-  /**
-   * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
-   */
-  public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
-    public CollectionsSingletonListSerializer() {
-      setImmutable(true);
-    }
-
-    @Override
-    public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
-      final Object obj = kryo.readClassAndObject(input);
-      return Collections.singletonList(obj);
-    }
-
-    @Override
-    public void write(final Kryo kryo, final Output output, final List<?> list) {
-      kryo.writeClassAndObject(output, list.get(0));
-    }
-
-  }
-
-  public static void registerSerializers(Config config) {
-    config.registerSerialization(Collections.singletonList("").getClass(),
-            CollectionsSingletonListSerializer.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
new file mode 100644
index 0000000..e6f750c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/GuavaUtilsSerializer.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableTable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Specific serializer of {@link Kryo} for Guava utils class, e.g. ImmutableList, ImmutableMap...
+ */
+public class GuavaUtilsSerializer {
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableList.
+   */
+  public static class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableListSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+      output.writeInt(object.size(), true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+      final int size = input.readInt(true);
+      final Object[] list = new Object[size];
+      for (int i = 0; i < size; ++i) {
+        list[i] = kryo.readClassAndObject(input);
+      }
+      return ImmutableList.copyOf(list);
+    }
+  }
+
+  /**
+   * registers its serializer for the several ImmutableList related classes.
+   */
+  private static void registerImmutableListSerializers(Config config) {
+
+    // ImmutableList (abstract class)
+    //  +- RegularImmutableList
+    //  |   RegularImmutableList
+    //  +- SingletonImmutableList
+    //  |   Optimized for List with only 1 element.
+    //  +- SubList
+    //  |   Representation for part of ImmutableList
+    //  +- ReverseImmutableList
+    //  |   For iterating in reverse order
+    //  +- StringAsImmutableList
+    //  |   Used by Lists#charactersOf
+    //  +- Values (ImmutableTable values)
+    //      Used by return value of #values() when there are multiple cells
+
+    config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
+
+    config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
+        ImmutableListSerializer.class);
+    config.registerSerialization(
+        ImmutableList.of().reverse().getClass(),
+        ImmutableListSerializer.class);
+
+    config.registerSerialization(
+        Lists.charactersOf("KryoRocks").getClass(),
+        ImmutableListSerializer.class);
+
+    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
+    baseTable.put(1, 2, 3);
+    baseTable.put(4, 5, 6);
+    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+    config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+  }
+
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableMap.
+   */
+  public static class ImmutableMapSerializer extends
+      Serializer<ImmutableMap<Object, ? extends Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = true;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableMapSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output,
+                      ImmutableMap<Object, ? extends Object> immutableMap) {
+      kryo.writeObject(output, Maps.newHashMap(immutableMap));
+    }
+
+    @Override
+    public ImmutableMap<Object, Object> read(
+        Kryo kryo,
+        Input input,
+        Class<ImmutableMap<Object, ? extends Object>> type) {
+      Map map = kryo.readObject(input, HashMap.class);
+      return ImmutableMap.copyOf(map);
+    }
+  }
+
+  private enum DummyEnum {
+    VALUE1,
+    VALUE2
+  }
+
+  /**
+   * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+   * for the several ImmutableMap related classes.
+   */
+  private static void registerImmutableMapSerializers(Config config) {
+
+    config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+    config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+
+    Object o1 = new Object();
+    Object o2 = new Object();
+
+    config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+    config.registerSerialization(
+        ImmutableMap.of(o1, o1, o2, o2).getClass(),
+        ImmutableMapSerializer.class);
+    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+    for (DummyEnum e : DummyEnum.values()) {
+      enumMap.put(e, o1);
+    }
+
+    config.registerSerialization(
+        ImmutableMap.copyOf(enumMap).getClass(),
+        ImmutableMapSerializer.class);
+  }
+
+  /**
+   * Specific serializer of {@link Kryo} for ImmutableSet.
+   */
+  public static class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableSetSerializer() {
+      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+      output.writeInt(object.size(), true);
+      for (Object elm : object) {
+        kryo.writeClassAndObject(output, elm);
+      }
+    }
+
+    @Override
+    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+      final int size = input.readInt(true);
+      ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+      for (int i = 0; i < size; ++i) {
+        builder.add(kryo.readClassAndObject(input));
+      }
+      return builder.build();
+    }
+  }
+
+  private enum SomeEnum {
+    A, B, C
+  }
+
+  /**
+   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+   * for the several ImmutableSet related classes.
+   */
+  private static void registerImmutableSetSerializers(Config config) {
+
+    // ImmutableList (abstract class)
+    //  +- EmptyImmutableSet
+    //  |   EmptyImmutableSet
+    //  +- SingletonImmutableSet
+    //  |   Optimized for Set with only 1 element.
+    //  +- RegularImmutableSet
+    //  |   RegularImmutableList
+    //  +- EnumImmutableSet
+    //  |   EnumImmutableSet
+
+    config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+
+    // Note:
+    //  Only registering above is good enough for serializing/deserializing.
+    //  but if using Kryo#copy, following is required.
+
+    config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+    config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class);
+
+    config.registerSerialization(
+        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
+        ImmutableSetSerializer.class);
+  }
+
+  public static void registerSerializers(Config config) {
+    registerImmutableListSerializers(config);
+    registerImmutableMapSerializers(config);
+    registerImmutableSetSerializers(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
deleted file mode 100644
index 215ccf1..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableTable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Table;
-
-/**
- * Specific serializer of {@link Kryo} for ImmutableList.
- */
-public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = false;
-  private static final boolean IMMUTABLE = true;
-
-  public ImmutableListSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
-    output.writeInt(object.size(), true);
-    for (Object elm : object) {
-      kryo.writeClassAndObject(output, elm);
-    }
-  }
-
-  @Override
-  public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
-    final int size = input.readInt(true);
-    final Object[] list = new Object[size];
-    for (int i = 0; i < size; ++i) {
-      list[i] = kryo.readClassAndObject(input);
-    }
-    return ImmutableList.copyOf(list);
-  }
-
-  /**
-   * Creates a new {@link ImmutableListSerializer} and registers its serializer
-   * for the several ImmutableList related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    // ImmutableList (abstract class)
-    //  +- RegularImmutableList
-    //  |   RegularImmutableList
-    //  +- SingletonImmutableList
-    //  |   Optimized for List with only 1 element.
-    //  +- SubList
-    //  |   Representation for part of ImmutableList
-    //  +- ReverseImmutableList
-    //  |   For iterating in reverse order
-    //  +- StringAsImmutableList
-    //  |   Used by Lists#charactersOf
-    //  +- Values (ImmutableTable values)
-    //      Used by return value of #values() when there are multiple cells
-
-    config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
-
-    // Note:
-    //  Only registering above is good enough for serializing/deserializing.
-    //  but if using Kryo#copy, following is required.
-
-    config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
-    config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
-    config.registerSerialization(
-        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
-        ImmutableListSerializer.class);
-    config.registerSerialization(
-        ImmutableList.of().reverse().getClass(),
-        ImmutableListSerializer.class);
-
-    config.registerSerialization(
-        Lists.charactersOf("KryoRocks").getClass(),
-        ImmutableListSerializer.class);
-
-    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
-    baseTable.put(1, 2, 3);
-    baseTable.put(4, 5, 6);
-    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
-    config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
deleted file mode 100644
index 6fe3f59..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Specific serializer of {@link Kryo} for ImmutableMap.
- */
-public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = true;
-  private static final boolean IMMUTABLE = true;
-
-  public ImmutableMapSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
-    kryo.writeObject(output, Maps.newHashMap(immutableMap));
-  }
-
-  @Override
-  public ImmutableMap<Object, Object> read(
-      Kryo kryo,
-      Input input,
-      Class<ImmutableMap<Object, ? extends Object>> type) {
-    Map map = kryo.readObject(input, HashMap.class);
-    return ImmutableMap.copyOf(map);
-  }
-
-  /**
-   * Creates a new {@link ImmutableMapSerializer} and registers its serializer
-   * for the several ImmutableMap related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
-    config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
-
-    Object o1 = new Object();
-    Object o2 = new Object();
-
-    config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
-    config.registerSerialization(
-        ImmutableMap.of(o1, o1, o2, o2).getClass(),
-        ImmutableMapSerializer.class);
-    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
-    for (DummyEnum e : DummyEnum.values()) {
-      enumMap.put(e, o1);
-    }
-
-    config.registerSerialization(
-        ImmutableMap.copyOf(enumMap).getClass(),
-        ImmutableMapSerializer.class);
-  }
-
-  private enum DummyEnum {
-    VALUE1,
-    VALUE2
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
deleted file mode 100644
index 625a32c..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
-/**
- * Specific serializer of {@link Kryo} for ImmutableSet.
- */
-public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = false;
-  private static final boolean IMMUTABLE = true;
-
-  public ImmutableSetSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
-    output.writeInt(object.size(), true);
-    for (Object elm : object) {
-      kryo.writeClassAndObject(output, elm);
-    }
-  }
-
-  @Override
-  public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
-    final int size = input.readInt(true);
-    ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
-    for (int i = 0; i < size; ++i) {
-      builder.add(kryo.readClassAndObject(input));
-    }
-    return builder.build();
-  }
-
-  /**
-   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
-   * for the several ImmutableSet related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    // ImmutableList (abstract class)
-    //  +- EmptyImmutableSet
-    //  |   EmptyImmutableSet
-    //  +- SingletonImmutableSet
-    //  |   Optimized for Set with only 1 element.
-    //  +- RegularImmutableSet
-    //  |   RegularImmutableList
-    //  +- EnumImmutableSet
-    //  |   EnumImmutableSet
-
-    config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
-
-    // Note:
-    //  Only registering above is good enough for serializing/deserializing.
-    //  but if using Kryo#copy, following is required.
-
-    config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
-    config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
-    config.registerSerialization(ImmutableSet.of(1, 2, 3).getClass(), ImmutableSetSerializer.class);
-
-    config.registerSerialization(
-        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
-        ImmutableSetSerializer.class);
-  }
-
-  private enum SomeEnum {
-    A, B, C
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JStormUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JStormUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JStormUtilsSerializer.java
new file mode 100644
index 0000000..69cfe24
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JStormUtilsSerializer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.cache.ComposedKey;
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Specific serializer of {@link Kryo} for the Utils of JStorm Runner.
+ */
+public class JStormUtilsSerializer {
+
+  /**
+   * Specific {@link Kryo} serializer for {@link ComposedKey}.
+   */
+  public static class ComposedKeySerializer extends Serializer<ComposedKey> {
+    public ComposedKeySerializer() {
+      setImmutable(true);
+    }
+
+    @Override
+    public ComposedKey read(final Kryo kryo, final Input input, final Class<ComposedKey> type) {
+      final ComposedKey ret = ComposedKey.of();
+      int len = input.readInt(true);
+      for (int i = 0; i < len; i++) {
+        Object obj = kryo.readClassAndObject(input);
+        ret.add(obj);
+      }
+      return ret;
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final ComposedKey object) {
+      int len = object.size();
+      output.writeInt(len, true);
+      for (Object elem : object) {
+        kryo.writeClassAndObject(output, elem);
+      }
+    }
+  }
+
+  private static void registerComposedKeySerializers(Config config) {
+    config.registerSerialization(ComposedKey.class, ComposedKeySerializer.class);
+  }
+
+  /**
+   * Specific serializer of {@link Kryo} for KvStoreIterable.
+   */
+  public static class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
+
+    public KvStoreIterableSerializer() {
+
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
+      int len = Iterables.size(object);
+      output.writeInt(len, true);
+      Iterator<Object> itr = object.iterator();
+      while (itr.hasNext()) {
+        Object elem = itr.next();
+        kryo.writeClassAndObject(output, elem);
+      }
+    }
+
+    @Override
+    public KvStoreIterable<Object> read(Kryo kryo, Input input,
+                                        Class<KvStoreIterable<Object>> type) {
+      final int size = input.readInt(true);
+      List<Object> values = Lists.newArrayList();
+      for (int i = 0; i < size; ++i) {
+        Object elem = kryo.readClassAndObject(input);
+        values.add(elem);
+      }
+
+      return new KvStoreIterable<Object>() {
+        Iterable<Object> values;
+
+        @Override
+        public Iterator<Object> iterator() {
+          return values.iterator();
+        }
+
+        public KvStoreIterable init(Iterable<Object> values) {
+          this.values = values;
+          return this;
+        }
+
+        @Override
+        public String toString() {
+          return values.toString();
+        }
+      }.init(values);
+    }
+  }
+
+  public static void registerSerializers(Config config) {
+    registerComposedKeySerializers(config);
+    config.registerDefaultSerailizer(KvStoreIterable.class, KvStoreIterableSerializer.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
new file mode 100644
index 0000000..5df686c
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Specific serializer of {@link Kryo} for Java Utils, e.g. Collections.SingletonList...
+ */
+public class JavaUtilsSerializer {
+
+  /**
+   * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
+   */
+  public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
+    public CollectionsSingletonListSerializer() {
+      setImmutable(true);
+    }
+
+    @Override
+    public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) {
+      final Object obj = kryo.readClassAndObject(input);
+      return Collections.singletonList(obj);
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final List<?> list) {
+      kryo.writeClassAndObject(output, list.get(0));
+    }
+
+  }
+
+  /**
+   * Specific serializer of {@link Kryo} for Unmodifiable Collection.
+   */
+  public static class UnmodifiableCollectionsSerializer extends Serializer<Object> {
+
+    @Override
+    public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
+      final int ordinal = input.readInt(true);
+      final UnmodifiableCollection unmodifiableCollection =
+          UnmodifiableCollection.values()[ordinal];
+      final Object sourceCollection = kryo.readClassAndObject(input);
+      return unmodifiableCollection.create(sourceCollection);
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final Object object) {
+      try {
+        final UnmodifiableCollection unmodifiableCollection =
+            UnmodifiableCollection.valueOfType(object.getClass());
+        // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
+        output.writeInt(unmodifiableCollection.ordinal(), true);
+        kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object));
+      } catch (final RuntimeException e) {
+        // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+        // handles SerializationException specifically (resizing the buffer)...
+        throw e;
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public Object copy(Kryo kryo, Object original) {
+      try {
+        final UnmodifiableCollection unmodifiableCollection =
+            UnmodifiableCollection.valueOfType(original.getClass());
+        Object sourceCollectionCopy =
+            kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
+        return unmodifiableCollection.create(sourceCollectionCopy);
+      } catch (final RuntimeException e) {
+        // Don't eat and wrap RuntimeExceptions
+        throw e;
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static final Field SOURCE_COLLECTION_FIELD;
+  private static final Field SOURCE_MAP_FIELD;
+
+  static {
+    try {
+      SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection")
+          .getDeclaredField("c");
+      SOURCE_COLLECTION_FIELD.setAccessible(true);
+
+
+      SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap")
+          .getDeclaredField("m");
+      SOURCE_MAP_FIELD.setAccessible(true);
+    } catch (final Exception e) {
+      throw new RuntimeException("Could not access source collection"
+          + " field in java.util.Collections$UnmodifiableCollection.", e);
+    }
+  }
+
+  private enum UnmodifiableCollection {
+    COLLECTION(
+        Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableCollection((Collection<?>) sourceCollection);
+      }
+    },
+    RANDOM_ACCESS_LIST(
+        Collections.unmodifiableList(new ArrayList<Void>()).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableList((List<?>) sourceCollection);
+      }
+    },
+    LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableList((List<?>) sourceCollection);
+      }
+    },
+    SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSet((Set<?>) sourceCollection);
+      }
+    },
+    SORTED_SET(
+        Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(),
+        SOURCE_COLLECTION_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection);
+      }
+    },
+    MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableMap((Map<?, ?>) sourceCollection);
+      }
+
+    },
+    SORTED_MAP(
+        Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(),
+        SOURCE_MAP_FIELD) {
+      @Override
+      public Object create(final Object sourceCollection) {
+        return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection);
+      }
+    };
+
+    private final Class<?> type;
+    private final Field sourceCollectionField;
+
+    private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) {
+      this.type = type;
+      this.sourceCollectionField = sourceCollectionField;
+    }
+
+    /**
+     * @param sourceCollection
+     */
+    public abstract Object create(Object sourceCollection);
+
+    static UnmodifiableCollection valueOfType(final Class<?> type) {
+      for (final UnmodifiableCollection item : values()) {
+        if (item.type.equals(type)) {
+          return item;
+        }
+      }
+      throw new IllegalArgumentException("The type " + type + " is not supported.");
+    }
+
+  }
+
+  /**
+   * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
+   * for the several unmodifiable Collections that can be created via {@link Collections},
+   * including {@link Map}s.
+   *
+   * @see Collections#unmodifiableCollection(Collection)
+   * @see Collections#unmodifiableList(List)
+   * @see Collections#unmodifiableSet(Set)
+   * @see Collections#unmodifiableSortedSet(SortedSet)
+   * @see Collections#unmodifiableMap(Map)
+   * @see Collections#unmodifiableSortedMap(SortedMap)
+   */
+  private static void registerUnmodifableCollectionSerializers(Config config) {
+    UnmodifiableCollection.values();
+    for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
+      config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
+    }
+  }
+
+  public static void registerSerializers(Config config) {
+    config.registerSerialization(Collections.singletonList("").getClass(),
+        CollectionsSingletonListSerializer.class);
+    registerUnmodifableCollectionSerializers(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
deleted file mode 100644
index 44a5378..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import com.alibaba.jstorm.cache.KvStoreIterable;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Specific serializer of {@link Kryo} for KvStoreIterable.
- */
-public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
-
-  public KvStoreIterableSerializer() {
-
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
-    List<Object> values = Lists.newArrayList(object);
-    output.writeInt(values.size(), true);
-    for (Object elm : object) {
-      kryo.writeClassAndObject(output, elm);
-    }
-  }
-
-  @Override
-  public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
-    final int size = input.readInt(true);
-    List<Object> values = Lists.newArrayList();
-    for (int i = 0; i < size; ++i) {
-      values.add(kryo.readClassAndObject(input));
-    }
-
-    return new KvStoreIterable<Object>() {
-      Iterable<Object> values;
-
-      @Override
-      public Iterator<Object> iterator() {
-        return values.iterator();
-      }
-
-      public KvStoreIterable init(Iterable<Object> values) {
-        this.values = values;
-        return this;
-      }
-
-      @Override
-      public String toString() {
-        return values.toString();
-      }
-    }.init(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
deleted file mode 100644
index e4e0e12..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.HashBasedTable;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableTable;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.Table;
-
-/**
- * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableList.
- */
-public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = false;
-  private static final boolean IMMUTABLE = true;
-
-  public SdkRepackImmuListSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
-    output.writeInt(object.size(), true);
-    for (Object elm : object) {
-      kryo.writeClassAndObject(output, elm);
-    }
-  }
-
-  @Override
-  public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
-    final int size = input.readInt(true);
-    final Object[] list = new Object[size];
-    for (int i = 0; i < size; ++i) {
-      list[i] = kryo.readClassAndObject(input);
-    }
-    return ImmutableList.copyOf(list);
-  }
-
-  /**
-   * Creates a new {@link ImmutableListSerializer} and registers its serializer
-   * for the several ImmutableList related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    // ImmutableList (abstract class)
-    //  +- RegularImmutableList
-    //  |   RegularImmutableList
-    //  +- SingletonImmutableList
-    //  |   Optimized for List with only 1 element.
-    //  +- SubList
-    //  |   Representation for part of ImmutableList
-    //  +- ReverseImmutableList
-    //  |   For iterating in reverse order
-    //  +- StringAsImmutableList
-    //  |   Used by Lists#charactersOf
-    //  +- Values (ImmutableTable values)
-    //      Used by return value of #values() when there are multiple cells
-
-    config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
-
-    // Note:
-    //  Only registering above is good enough for serializing/deserializing.
-    //  but if using Kryo#copy, following is required.
-
-    config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
-    config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
-    config.registerSerialization(
-        ImmutableList.of(1, 2, 3).subList(1, 2).getClass(),
-        SdkRepackImmuListSerializer.class);
-    config.registerSerialization(
-        ImmutableList.of().reverse().getClass(),
-        SdkRepackImmuListSerializer.class);
-
-    config.registerSerialization(
-        Lists.charactersOf("KryoRocks").getClass(),
-        SdkRepackImmuListSerializer.class);
-
-    Table<Integer, Integer, Integer> baseTable = HashBasedTable.create();
-    baseTable.put(1, 2, 3);
-    baseTable.put(4, 5, 6);
-    Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
-    config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
deleted file mode 100644
index 3cb60e8..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableSet;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.Sets;
-
-/**
- * Specific serializer of {@link Kryo} for Beam SDK repackaged ImmutableSet.
- */
-public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = false;
-  private static final boolean IMMUTABLE = true;
-
-  public SdkRepackImmuSetSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
-    output.writeInt(object.size(), true);
-    for (Object elm : object) {
-      kryo.writeClassAndObject(output, elm);
-    }
-  }
-
-  @Override
-  public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
-    final int size = input.readInt(true);
-    ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
-    for (int i = 0; i < size; ++i) {
-      builder.add(kryo.readClassAndObject(input));
-    }
-    return builder.build();
-  }
-
-  /**
-   * Creates a new {@link ImmutableSetSerializer} and registers its serializer
-   * for the several ImmutableSet related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    // ImmutableList (abstract class)
-    //  +- EmptyImmutableSet
-    //  |   EmptyImmutableSet
-    //  +- SingletonImmutableSet
-    //  |   Optimized for Set with only 1 element.
-    //  +- RegularImmutableSet
-    //  |   RegularImmutableList
-    //  +- EnumImmutableSet
-    //  |   EnumImmutableSet
-
-    config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
-
-    // Note:
-    //  Only registering above is good enough for serializing/deserializing.
-    //  but if using Kryo#copy, following is required.
-
-    config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
-    config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
-    config.registerSerialization(
-        ImmutableSet.of(1, 2, 3).getClass(),
-        SdkRepackImmuSetSerializer.class);
-
-    config.registerSerialization(
-        Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(),
-        SdkRepackImmuSetSerializer.class);
-  }
-
-  private enum SomeEnum {
-    A, B, C
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
deleted file mode 100644
index 1a0d902..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmutableMapSerializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
-import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
-
-/**
- * Specific serializer of {@link Kryo} for ImmutableMap.
- */
-public class SdkRepackImmutableMapSerializer
-    extends Serializer<ImmutableMap<Object, ? extends Object>> {
-
-  private static final boolean DOES_NOT_ACCEPT_NULL = true;
-  private static final boolean IMMUTABLE = true;
-
-  public SdkRepackImmutableMapSerializer() {
-    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
-  }
-
-  @Override
-  public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
-    kryo.writeObject(output, Maps.newHashMap(immutableMap));
-  }
-
-  @Override
-  public ImmutableMap<Object, Object> read(
-      Kryo kryo,
-      Input input,
-      Class<ImmutableMap<Object, ? extends Object>> type) {
-    Map map = kryo.readObject(input, HashMap.class);
-    return ImmutableMap.copyOf(map);
-  }
-
-  /**
-   * Creates a new {@link SdkRepackImmutableMapSerializer} and registers its serializer
-   * for the several ImmutableMap related classes.
-   */
-  public static void registerSerializers(Config config) {
-
-    config.registerSerialization(ImmutableMap.class, SdkRepackImmutableMapSerializer.class);
-    config.registerSerialization(
-        ImmutableMap.of().getClass(), SdkRepackImmutableMapSerializer.class);
-
-    Object o1 = new Object();
-    Object o2 = new Object();
-
-    config.registerSerialization(
-        ImmutableMap.of(o1, o1).getClass(), SdkRepackImmutableMapSerializer.class);
-    config.registerSerialization(
-        ImmutableMap.of(o1, o1, o2, o2).getClass(),
-        SdkRepackImmutableMapSerializer.class);
-    Map<DummyEnum, Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
-    for (DummyEnum e : DummyEnum.values()) {
-      enumMap.put(e, o1);
-    }
-
-    config.registerSerialization(
-        ImmutableMap.copyOf(enumMap).getClass(),
-        SdkRepackImmutableMapSerializer.class);
-  }
-
-  private enum DummyEnum {
-    VALUE1,
-    VALUE2
-  }
-}