You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:05 UTC

[05/53] [abbrv] beam git commit: jstorm-runner: rename the package to org.apache.beam.runners.jstorm.

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
new file mode 100644
index 0000000..aa7d325
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -0,0 +1,92 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.*;
+
+public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableListSerializer() {
+        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+        output.writeInt(object.size(), true);
+        for (Object elm : object) {
+            kryo.writeClassAndObject(output, elm);
+        }
+    }
+
+    @Override
+    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+        final int size = input.readInt(true);
+        final Object[] list = new Object[size];
+        for (int i = 0; i < size; ++i) {
+            list[i] = kryo.readClassAndObject(input);
+        }
+        return ImmutableList.copyOf(list);
+    }
+
+    /**
+     * Creates a new {@link ImmutableListSerializer} and registers its serializer
+     * for the several ImmutableList related classes.
+     */
+    public static void registerSerializers(Config config) {
+
+        // ImmutableList (abstract class)
+        //  +- RegularImmutableList
+        //  |   RegularImmutableList
+        //  +- SingletonImmutableList
+        //  |   Optimized for List with only 1 element.
+        //  +- SubList
+        //  |   Representation for part of ImmutableList
+        //  +- ReverseImmutableList
+        //  |   For iterating in reverse order
+        //  +- StringAsImmutableList
+        //  |   Used by Lists#charactersOf
+        //  +- Values (ImmutableTable values)
+        //      Used by return value of #values() when there are multiple cells
+
+        config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
+
+        // Note:
+        //  Only registering above is good enough for serializing/deserializing.
+        //  but if using Kryo#copy, following is required.
+
+        config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class);
+        config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class);
+        config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class);
+        config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class);
+
+        config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class);
+
+        Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
+        baseTable.put(1, 2, 3);
+        baseTable.put(4, 5, 6);
+        Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+        config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+        config.registerSerialization(
+                RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
new file mode 100644
index 0000000..ee8b765
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -0,0 +1,61 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = true;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableMapSerializer() {
+        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+        kryo.writeObject(output, Maps.newHashMap(immutableMap));
+    }
+
+    @Override
+    public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) {
+        Map map = kryo.readObject(input, HashMap.class);
+        return ImmutableMap.copyOf(map);
+    }
+
+    /**
+     * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+     * for the several ImmutableMap related classes.
+     */
+    public static void registerSerializers(Config config) {
+
+        config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+        config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+
+        Object o1 = new Object();
+        Object o2 = new Object();
+
+        config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+        config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class);
+        Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+        for (DummyEnum e : DummyEnum.values()) {
+            enumMap.put(e, o1);
+        }
+
+        config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class);
+    }
+
+    private enum DummyEnum {
+        VALUE1,
+        VALUE2
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
new file mode 100644
index 0000000..cdc4382
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -0,0 +1,71 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public ImmutableSetSerializer() {
+        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+        output.writeInt(object.size(), true);
+        for (Object elm : object) {
+            kryo.writeClassAndObject(output, elm);
+        }
+    }
+
+    @Override
+    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+        final int size = input.readInt(true);
+        ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+        for (int i = 0; i < size; ++i) {
+            builder.add(kryo.readClassAndObject(input));
+        }
+        return builder.build();
+    }
+
+    /**
+     * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+     * for the several ImmutableSet related classes.
+     */
+    public static void registerSerializers(Config config) {
+
+        // ImmutableList (abstract class)
+        //  +- EmptyImmutableSet
+        //  |   EmptyImmutableSet
+        //  +- SingletonImmutableSet
+        //  |   Optimized for Set with only 1 element.
+        //  +- RegularImmutableSet
+        //  |   RegularImmutableList
+        //  +- EnumImmutableSet
+        //  |   EnumImmutableSet
+
+        config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+
+        // Note:
+        //  Only registering above is good enough for serializing/deserializing.
+        //  but if using Kryo#copy, following is required.
+
+        config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+        config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+        config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class);
+
+        config.registerSerialization(
+                Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class);
+    }
+
+    private enum SomeEnum {
+        A, B, C
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
new file mode 100644
index 0000000..decfb3f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -0,0 +1,55 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
+
+    public KvStoreIterableSerializer() {
+
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
+        List<Object> values = Lists.newArrayList(object);
+        output.writeInt(values.size(), true);
+        for (Object elm : object) {
+            kryo.writeClassAndObject(output, elm);
+        }
+    }
+
+    @Override
+    public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
+        final int size = input.readInt(true);
+        List<Object> values = Lists.newArrayList();
+        for (int i = 0; i < size; ++i) {
+            values.add(kryo.readClassAndObject(input));
+        }
+
+        return new KvStoreIterable<Object>() {
+            Iterable<Object> values;
+
+            @Override
+            public Iterator<Object> iterator() {
+                return values.iterator();
+            }
+
+            public KvStoreIterable init(Iterable<Object> values) {
+                this.values = values;
+                return this;
+            }
+
+            @Override
+            public String toString() {
+                return values.toString();
+            }
+        }.init(values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
new file mode 100644
index 0000000..9bb315b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -0,0 +1,78 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+
+public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public SdkRepackImmuListSerializer() {
+        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+        output.writeInt(object.size(), true);
+        for (Object elm : object) {
+            kryo.writeClassAndObject(output, elm);
+        }
+    }
+
+    @Override
+    public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+        final int size = input.readInt(true);
+        final Object[] list = new Object[size];
+        for (int i = 0; i < size; ++i) {
+            list[i] = kryo.readClassAndObject(input);
+        }
+        return ImmutableList.copyOf(list);
+    }
+
+    /**
+     * Creates a new {@link ImmutableListSerializer} and registers its serializer
+     * for the several ImmutableList related classes.
+     */
+    public static void registerSerializers(Config config) {
+
+        // ImmutableList (abstract class)
+        //  +- RegularImmutableList
+        //  |   RegularImmutableList
+        //  +- SingletonImmutableList
+        //  |   Optimized for List with only 1 element.
+        //  +- SubList
+        //  |   Representation for part of ImmutableList
+        //  +- ReverseImmutableList
+        //  |   For iterating in reverse order
+        //  +- StringAsImmutableList
+        //  |   Used by Lists#charactersOf
+        //  +- Values (ImmutableTable values)
+        //      Used by return value of #values() when there are multiple cells
+
+        config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
+
+        // Note:
+        //  Only registering above is good enough for serializing/deserializing.
+        //  but if using Kryo#copy, following is required.
+
+        config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
+        config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
+        config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class);
+        config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class);
+
+        config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class);
+
+        Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
+        baseTable.put(1, 2, 3);
+        baseTable.put(4, 5, 6);
+        Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+        config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
new file mode 100644
index 0000000..a514645
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -0,0 +1,71 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+
+public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+    private static final boolean DOES_NOT_ACCEPT_NULL = false;
+    private static final boolean IMMUTABLE = true;
+
+    public SdkRepackImmuSetSerializer() {
+        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+        output.writeInt(object.size(), true);
+        for (Object elm : object) {
+            kryo.writeClassAndObject(output, elm);
+        }
+    }
+
+    @Override
+    public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+        final int size = input.readInt(true);
+        ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+        for (int i = 0; i < size; ++i) {
+            builder.add(kryo.readClassAndObject(input));
+        }
+        return builder.build();
+    }
+
+    /**
+     * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+     * for the several ImmutableSet related classes.
+     */
+    public static void registerSerializers(Config config) {
+
+        // ImmutableList (abstract class)
+        //  +- EmptyImmutableSet
+        //  |   EmptyImmutableSet
+        //  +- SingletonImmutableSet
+        //  |   Optimized for Set with only 1 element.
+        //  +- RegularImmutableSet
+        //  |   RegularImmutableList
+        //  +- EnumImmutableSet
+        //  |   EnumImmutableSet
+
+        config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
+
+        // Note:
+        //  Only registering above is good enough for serializing/deserializing.
+        //  but if using Kryo#copy, following is required.
+
+        config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
+        config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
+        config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class);
+
+        config.registerSerialization(
+                Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class);
+    }
+
+    private enum SomeEnum {
+        A, B, C
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
new file mode 100644
index 0000000..c8b0138
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -0,0 +1,159 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
+
+    private static final Field SOURCE_COLLECTION_FIELD;
+    private static final Field SOURCE_MAP_FIELD;
+
+    static {
+        try {
+            SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" )
+                    .getDeclaredField( "c" );
+            SOURCE_COLLECTION_FIELD.setAccessible( true );
+
+
+            SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" )
+                    .getDeclaredField( "m" );
+            SOURCE_MAP_FIELD.setAccessible( true );
+        } catch ( final Exception e ) {
+            throw new RuntimeException( "Could not access source collection" +
+                    " field in java.util.Collections$UnmodifiableCollection.", e );
+        }
+    }
+
+    @Override
+    public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
+        final int ordinal = input.readInt( true );
+        final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
+        final Object sourceCollection = kryo.readClassAndObject( input );
+        return unmodifiableCollection.create( sourceCollection );
+    }
+
+    @Override
+    public void write(final Kryo kryo, final Output output, final Object object) {
+        try {
+            final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() );
+            // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
+            output.writeInt( unmodifiableCollection.ordinal(), true );
+            kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) );
+        } catch ( final RuntimeException e ) {
+            // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+            // handles SerializationException specifically (resizing the buffer)...
+            throw e;
+        } catch ( final Exception e ) {
+            throw new RuntimeException( e );
+        }
+    }
+
+    @Override
+    public Object copy(Kryo kryo, Object original) {
+        try {
+            final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() );
+            Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
+            return unmodifiableCollection.create( sourceCollectionCopy );
+        } catch ( final RuntimeException e ) {
+            // Don't eat and wrap RuntimeExceptions
+            throw e;
+        } catch ( final Exception e ) {
+            throw new RuntimeException( e );
+        }
+    }
+
+    private static enum UnmodifiableCollection {
+        COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableCollection( (Collection<?>) sourceCollection );
+            }
+        },
+        RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableList( (List<?>) sourceCollection );
+            }
+        },
+        LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableList( (List<?>) sourceCollection );
+            }
+        },
+        SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableSet( (Set<?>) sourceCollection );
+            }
+        },
+        SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection );
+            }
+        },
+        MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
+
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection );
+            }
+
+        },
+        SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
+            @Override
+            public Object create( final Object sourceCollection ) {
+                return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection );
+            }
+        };
+
+        private final Class<?> type;
+        private final Field sourceCollectionField;
+
+        private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) {
+            this.type = type;
+            this.sourceCollectionField = sourceCollectionField;
+        }
+
+        /**
+         * @param sourceCollection
+         */
+        public abstract Object create( Object sourceCollection );
+
+        static UnmodifiableCollection valueOfType(final Class<?> type ) {
+            for( final UnmodifiableCollection item : values() ) {
+                if ( item.type.equals( type ) ) {
+                    return item;
+                }
+            }
+            throw new IllegalArgumentException( "The type " + type + " is not supported." );
+        }
+
+    }
+
+    /**
+     * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
+     * for the several unmodifiable Collections that can be created via {@link Collections},
+     * including {@link Map}s.
+     *
+     * @see Collections#unmodifiableCollection(Collection)
+     * @see Collections#unmodifiableList(List)
+     * @see Collections#unmodifiableSet(Set)
+     * @see Collections#unmodifiableSortedSet(SortedSet)
+     * @see Collections#unmodifiableMap(Map)
+     * @see Collections#unmodifiableSortedMap(SortedMap)
+     */
+    public static void registerSerializers( Config config ) {
+        UnmodifiableCollection.values();
+        for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) {
+            config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
new file mode 100644
index 0000000..d907fac
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+
+import java.util.List;
+
+/**
+ * Pipleline translator of Storm
+ */
+public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+    private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+    private TranslationContext context;
+    private int depth = 0;
+
+    public StormPipelineTranslator(TranslationContext context) {
+        this.context = context;
+    }
+
+    public void translate(Pipeline pipeline) {
+        List<PTransformOverride> transformOverrides =
+                ImmutableList.<PTransformOverride>builder()
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                                new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+                        .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                               new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class))))
+                        .build();
+        pipeline.replaceAll(transformOverrides);
+        pipeline.traverseTopologically(this);
+    }
+
+    @Override
+    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+        LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+        this.depth++;
+
+        // check if current composite transforms need to be translated. 
+        // If not, all sub transforms will be translated in visitPrimitiveTransform.
+        PTransform<?, ?> transform = node.getTransform();
+        if (transform != null) {
+            TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+            if (translator != null && applyCanTranslate(transform, node, translator)) {
+                applyStreamingTransform(transform, node, translator);
+                LOG.info(genSpaces(this.depth) + "translated-" + node);
+                return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+            }
+        }
+        return CompositeBehavior.ENTER_TRANSFORM;
+    }
+
+    public void leaveCompositeTransform(TransformHierarchy.Node node) {
+        this.depth--;
+        LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+    }
+
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+        LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+        if (!node.isRootNode()) {
+            PTransform<?, ?> transform = node.getTransform();
+            TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+            if (translator == null || !applyCanTranslate(transform, node, translator)) {
+                LOG.info(node.getTransform().getClass().toString());
+                throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+            }
+            applyStreamingTransform(transform, node, translator);
+        }
+    }
+
+    public void visitValue(PValue value, TransformHierarchy.Node node) {
+        LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+    }
+
+    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node,
+            TransformTranslator<?> translator) {
+        @SuppressWarnings("unchecked")
+        T typedTransform = (T) transform;
+        @SuppressWarnings("unchecked")
+        TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+        context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+        typedTranslator.translateNode(typedTransform, context);
+
+        // Maintain PValue to TupleTag map for side inputs translation.
+        context.getUserGraphContext().recordOutputTaggedPValue();
+    }
+
+    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) {
+        @SuppressWarnings("unchecked")
+        T typedTransform = (T) transform;
+        @SuppressWarnings("unchecked")
+        TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+        context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+        return typedTranslator.canTranslate(typedTransform, context);
+    }
+
+    /**
+     * Utility formatting method.
+     * 
+     * @param n number of spaces to generate
+     * @return String with "|" followed by n spaces
+     */
+    protected static String genSpaces(int n) {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < n; i++) {
+            builder.append("|   ");
+        }
+        return builder.toString();
+    }
+
+    private static class ReflectiveOneToOneOverrideFactory<
+            InputT extends PValue,
+            OutputT extends PValue,
+            TransformT extends PTransform<InputT, OutputT>>
+            extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+        private final Class<PTransform<InputT, OutputT>> replacement;
+
+        private ReflectiveOneToOneOverrideFactory(
+                Class<PTransform<InputT, OutputT>> replacement) {
+            this.replacement = replacement;
+        }
+
+        @Override
+        public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+            PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+            PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+                    .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform)
+                    .build();
+            InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+            return PTransformReplacement.of(inputT, replacedPTransform);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
new file mode 100644
index 0000000..bf4515f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import com.google.common.base.Strings;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Maintains the state necessary during Pipeline translation to build a Storm topology.
+ */
+public class TranslationContext {
+    private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+    private final UserGraphContext userGraphContext;
+    private final ExecutionGraphContext executionGraphContext;
+
+    public TranslationContext(StormPipelineOptions options) {
+        this.userGraphContext = new UserGraphContext(options);
+        this.executionGraphContext = new ExecutionGraphContext();
+    }
+
+    public ExecutionGraphContext getExecutionGraphContext() {
+        return executionGraphContext;
+    }
+
+    public UserGraphContext getUserGraphContext() {
+        return userGraphContext;
+    }
+
+    private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
+        Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
+        if (!producer.getComponentId().equals(destComponentName)) {
+            Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
+            executionGraphContext.registerStreamConsumer(consumer, producer);
+
+            ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
+            if (executorsBolt != null) {
+                executorsBolt.addExternalOutputTag(input.getTag());
+            }
+        }
+    }
+
+    private String getUpstreamExecutorsBolt() {
+        for (PValue value : userGraphContext.getInputs().values()) {
+            String componentId = executionGraphContext.getProducerComponentId(value);
+            if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
+                return componentId;
+            }
+        }
+        // When upstream component is spout, "null" will be return.
+        return null;
+    }
+
+    /**
+     * check if the current transform is applied to source collection.
+     * @return
+     */
+    private boolean connectedToSource() {
+        for (PValue value : userGraphContext.getInputs().values()) {
+            if (executionGraphContext.producedBySpout(value)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * @param upstreamExecutorsBolt
+     * @return true if there is multiple input streams, or upstream executor output the same stream
+     *          to different executors
+     */
+    private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
+        if (inputs.size() > 1) {
+            return true;
+        } else {
+            final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
+            if (!intersection.isEmpty()) {
+                // there is already a different executor consume the same input
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public void addTransformExecutor(Executor executor) {
+        addTransformExecutor(executor, Collections.EMPTY_LIST);
+    }
+
+    public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
+        addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
+    }
+
+    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
+        addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
+    }
+
+    public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
+        String name = null;
+
+        ExecutorsBolt bolt = null;
+
+        boolean isGBK = false;
+        /**
+         * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
+         * For following cases, a new bolt is created for the specified executor, otherwise the executor
+         * will be added into the bolt contains corresponding upstream executor.
+         * a) it is a GroupByKey executor
+         * b) it is connected to source directly
+         * c) None existing upstream bolt was found
+         * d) For the purpose of performance to reduce the side effects between multiple streams which
+         *    is output to same executor, a new bolt will be created.
+         */
+        if (RunnerUtils.isGroupByKeyExecutor(executor)) {
+            bolt = new ExecutorsBolt();
+            name = executionGraphContext.registerBolt(bolt);
+            isGBK = true;
+        } else if (connectedToSource()) {
+            bolt = new ExecutorsBolt();
+            name = executionGraphContext.registerBolt(bolt);
+        } else {
+            name = getUpstreamExecutorsBolt();
+            if (name == null) {
+                bolt = new ExecutorsBolt();
+                name = executionGraphContext.registerBolt(bolt);
+            } else {
+                bolt = executionGraphContext.getBolt(name);
+                if (isMultipleInputOrOutput(bolt, inputs)) {
+                    bolt = new ExecutorsBolt();
+                    name = executionGraphContext.registerBolt(bolt);
+                }
+            }
+        }
+
+        // update the output tags of current transform into ExecutorsBolt
+        for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+            TupleTag tag = entry.getKey();
+            PValue value = entry.getValue();
+
+            // use tag of PValueBase
+            if (value instanceof PValueBase) {
+                tag = ((PValueBase) value).expand().keySet().iterator().next();
+            }
+            executionGraphContext.registerStreamProducer(
+                    TaggedPValue.of(tag, value),
+                    Stream.Producer.of(name, tag.getId(), value.getName()));
+            //bolt.addOutputTags(tag);
+        }
+
+        // add the transform executor into the chain of ExecutorsBolt
+        for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
+            TupleTag tag = entry.getKey();
+            PValue value = entry.getValue();
+            bolt.addExecutor(tag, executor);
+
+            // filter all connections inside bolt
+            //if (!bolt.getOutputTags().contains(tag)) {
+                Stream.Grouping grouping;
+                if (isGBK) {
+                    grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
+                } else {
+                    grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
+                }
+                addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
+            //}
+        }
+
+        for (PValue sideInput : sideInputs) {
+            TupleTag tag = userGraphContext.findTupleTag(sideInput);
+            bolt.addExecutor(tag, executor);
+            checkState(!bolt.getOutputTags().contains(tag));
+            addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
+        }
+
+        bolt.registerExecutor(executor);
+
+        // set parallelismNumber
+        String pTransformfullName = userGraphContext.currentTransform.getFullName();
+        String compositeName = pTransformfullName.split("/")[0];
+        Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+        if (parallelismNumMap.containsKey(compositeName)) {
+            int configNum = (Integer) parallelismNumMap.get(compositeName);
+            int currNum = bolt.getParallelismNum();
+            bolt.setParallelismNum(Math.max(configNum, currNum));
+        }
+    }
+
+    // TODO: add getSideInputs() and getSideOutputs().
+    public static class UserGraphContext {
+        private final StormPipelineOptions options;
+        private final Map<PValue, TupleTag> pValueToTupleTag;
+        private AppliedPTransform<?, ?, ?> currentTransform = null;
+
+        private boolean isWindowed = false;
+
+        public UserGraphContext(StormPipelineOptions options) {
+            this.options = checkNotNull(options, "options");
+            this.pValueToTupleTag = Maps.newHashMap();
+        }
+
+        public StormPipelineOptions getOptions() {
+            return this.options;
+        }
+
+        public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+            this.currentTransform = transform;
+        }
+
+        public String getStepName() {
+            return currentTransform.getFullName();
+        }
+
+        public <T extends PValue> T getInput() {
+            return (T) currentTransform.getInputs().values().iterator().next();
+        }
+
+        public Map<TupleTag<?>, PValue> getInputs() {
+            return currentTransform.getInputs();
+        }
+
+        public TupleTag<?> getInputTag() {
+            return currentTransform.getInputs().keySet().iterator().next();
+        }
+
+        public List<TupleTag<?>> getInputTags() {
+            return Lists.newArrayList(currentTransform.getInputs().keySet());
+        }
+
+        public <T extends PValue> T getOutput() {
+            return (T) currentTransform.getOutputs().values().iterator().next();
+        }
+
+        public Map<TupleTag<?>, PValue> getOutputs() {
+            return currentTransform.getOutputs();
+        }
+
+        public TupleTag<?> getOutputTag() {
+            return currentTransform.getOutputs().keySet().iterator().next();
+        }
+
+        public List<TupleTag<?>> getOutputTags() {
+            return Lists.newArrayList(currentTransform.getOutputs().keySet());
+        }
+
+        public void recordOutputTaggedPValue() {
+            for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
+                pValueToTupleTag.put(entry.getValue(), entry.getKey());
+            }
+        }
+
+        public <T> TupleTag<T> findTupleTag(PValue pValue) {
+            return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
+        }
+
+        public void setWindowed() {
+            this.isWindowed = true;
+        }
+
+        public boolean isWindowed() {
+            return this.isWindowed;
+        }
+
+        @Override
+        public String toString() {
+            return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
+                    .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
+                        @Override
+                        public String apply(Map.Entry<PValue, TupleTag> entry) {
+                            return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
+                        }}));
+        }
+    }
+
+    public static class ExecutionGraphContext {
+
+        private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+        private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
+
+        // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
+        private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
+        private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
+
+        private final List<Stream> streams = new ArrayList<>();
+
+        private int id = 1;
+
+        public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+            checkNotNull(spout, "spout");
+            checkNotNull(output, "output");
+            String name = "spout" + genId();
+            this.spoutMap.put(name, spout);
+            registerStreamProducer(
+                    output,
+                    Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
+        }
+
+        public AdaptorBasicSpout getSpout(String id) {
+            if (Strings.isNullOrEmpty(id)) {
+                return null;
+            }
+            return this.spoutMap.get(id);
+        }
+
+        public Map<String, AdaptorBasicSpout> getSpouts() {
+            return this.spoutMap;
+        }
+
+        public String registerBolt(ExecutorsBolt bolt) {
+            checkNotNull(bolt, "bolt");
+            String name = "bolt" + genId();
+            this.boltMap.put(name, bolt);
+            return name;
+        }
+
+        public ExecutorsBolt getBolt(String id) {
+            if (Strings.isNullOrEmpty(id)) {
+                return null;
+            }
+            return this.boltMap.get(id);
+        }
+
+        public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
+            checkNotNull(taggedPValue, "taggedPValue");
+            checkNotNull(producer, "producer");
+            pValueToProducer.put(taggedPValue.getValue(), producer);
+            producerToTaggedPValue.put(producer, taggedPValue);
+        }
+
+        public Stream.Producer getProducer(PValue pValue) {
+            return pValueToProducer.get(checkNotNull(pValue, "pValue"));
+        }
+
+        public String getProducerComponentId(PValue pValue) {
+            Stream.Producer producer = getProducer(pValue);
+            return producer == null ? null : producer.getComponentId();
+        }
+
+        public boolean producedBySpout(PValue pValue) {
+            String componentId = getProducerComponentId(pValue);
+            return getSpout(componentId) != null;
+        }
+
+        public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
+            streams.add(Stream.of(
+                    checkNotNull(producer, "producer"),
+                    checkNotNull(consumer, "consumer")));
+        }
+
+        public Map<PValue, Stream.Producer> getPValueToProducers() {
+            return pValueToProducer;
+        }
+
+        public Iterable<Stream> getStreams() {
+            return streams;
+        }
+
+        @Override
+        public String toString() {
+            List<String> ret = new ArrayList<>();
+            ret.add("SPOUT");
+            for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+                ret.add(entry.getKey() + ": " + entry.getValue().toString());
+            }
+            ret.add("BOLT");
+            for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
+                ret.add(entry.getKey() + ": " + entry.getValue().toString());
+            }
+            ret.add("STREAM");
+            for (Stream stream : streams) {
+                ret.add(String.format(
+                        "%s@@%s ---> %s@@%s",
+                        stream.getProducer().getStreamId(),
+                        stream.getProducer().getComponentId(),
+                        stream.getConsumer().getGrouping(),
+                        stream.getConsumer().getComponentId()));
+            }
+            return Joiner.on("\n").join(ret);
+        }
+
+        private synchronized int genId() {
+            return id++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..0f856cf
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.beam.translation.translator.*;
+import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
+
+    private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
+
+    static {
+        TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
+        TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+        // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+        // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+        TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
+        TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
+
+        //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+        TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+
+        TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+
+        TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+
+        TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
+
+        /**
+         * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be 
+         * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
+         * If any improvement is required, the composite transforms will be translated in the future.
+         */
+        // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+        // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
+        // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
+    }
+
+    public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+        TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+        if (translator == null) {
+            LOG.warn("Unsupported operator={}", transform.getClass().getName());
+        }
+        return translator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
new file mode 100644
index 0000000..b07b426
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/*
+ * Enable user to add output stream definitions by API, rather than hard-code.
+ */
+public abstract class AbstractComponent implements IComponent {
+    private Map<String, Fields> streamToFields = new HashMap<>();
+    private Map<String, Boolean> keyStreams = new HashMap<>();
+    private int parallelismNum = 0;
+
+    public void addOutputField(String streamId) {
+        addOutputField(streamId, new Fields(CommonInstance.VALUE));
+    }
+
+    public void addOutputField(String streamId, Fields fields) {
+        streamToFields.put(streamId, fields);
+        keyStreams.put(streamId, false);
+    }
+
+    public void addKVOutputField(String streamId) {
+        streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+        keyStreams.put(streamId, true);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+            declarer.declareStream(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public boolean keyedEmit(String streamId) {
+        Boolean isKeyedStream = keyStreams.get(streamId);
+        return isKeyedStream == null ? false : isKeyedStream;
+    }
+
+    public int getParallelismNum() {
+        return parallelismNum;
+    }
+
+    public void setParallelismNum(int num) {
+        parallelismNum = num;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
new file mode 100644
index 0000000..91881f2
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.topology.IRichBatchBolt;
+
+public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
new file mode 100644
index 0000000..5a0c6ec
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.topology.IRichSpout;
+
+public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
new file mode 100644
index 0000000..2bf3303
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+import java.util.*;
+
+import avro.shaded.com.google.common.collect.Iterables;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.metric.MetricClient;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class DoFnExecutor<InputT, OutputT> implements Executor {
+    private static final long serialVersionUID = 5297603063991078668L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+
+    public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+        private static final long serialVersionUID = -661113364735206170L;
+
+        @Override
+        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+            executorsBolt.processExecutorElem(tag, output);
+        }
+    }
+
+    protected transient DoFnRunner<InputT, OutputT> runner = null;
+    protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+    protected final String stepName;
+
+    protected int internalDoFnExecutorId;
+
+    protected final String description;
+
+    protected final TupleTag<OutputT> mainTupleTag;
+    protected final List<TupleTag<?>> sideOutputTags;
+
+    protected SerializedPipelineOptions serializedOptions;
+    protected transient StormPipelineOptions pipelineOptions;
+
+    protected DoFn<InputT, OutputT> doFn;
+    protected final Coder<WindowedValue<InputT>> inputCoder;
+    protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+    protected OutputManager outputManager;
+    protected WindowingStrategy<?, ?> windowingStrategy;
+    protected final TupleTag<InputT> mainInputTag;
+    protected Collection<PCollectionView<?>> sideInputs;
+    protected SideInputHandler sideInputHandler;
+    protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+    // Initialize during runtime
+    protected ExecutorContext executorContext;
+    protected ExecutorsBolt executorsBolt;
+    protected TimerInternals timerInternals;
+    protected transient StateInternals pushbackStateInternals;
+    protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+    protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+    protected transient IKvStoreManager kvStoreManager;
+    protected DefaultStepContext stepContext;
+    protected transient MetricClient metricClient;
+
+    public DoFnExecutor(
+            String stepName,
+            String description,
+            StormPipelineOptions pipelineOptions,
+            DoFn<InputT, OutputT> doFn,
+            Coder<WindowedValue<InputT>> inputCoder,
+            WindowingStrategy<?, ?> windowingStrategy,
+            TupleTag<InputT> mainInputTag,
+            Collection<PCollectionView<?>> sideInputs,
+            Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+            TupleTag<OutputT> mainTupleTag,
+            List<TupleTag<?>> sideOutputTags) {
+        this.stepName = checkNotNull(stepName, "stepName");
+        this.description = checkNotNull(description, "description");
+        this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+        this.doFn = doFn;
+        this.inputCoder = inputCoder;
+        this.outputManager = new DoFnExecutorOutputManager();
+        this.windowingStrategy = windowingStrategy;
+        this.mainInputTag = mainInputTag;
+        this.sideInputs = sideInputs;
+        this.mainTupleTag = mainTupleTag;
+        this.sideOutputTags = sideOutputTags;
+        this.sideInputTagToView = sideInputTagToView;
+    }
+
+    protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+        return new DoFnRunnerWithMetrics<>(
+            stepName,
+            DoFnRunners.simpleRunner(
+                this.pipelineOptions,
+                this.doFn,
+                this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+                this.outputManager,
+                this.mainTupleTag,
+                this.sideOutputTags,
+                this.stepContext,
+                this.windowingStrategy),
+            MetricsReporter.create(metricClient));
+    }
+
+    protected void initService(ExecutorContext context) {
+        // TODO: what should be set for key in here?
+        timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
+        kvStoreManager = context.getKvStoreManager();
+        stepContext = new DefaultStepContext(timerInternals,
+                new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+        metricClient = new MetricClient(executorContext.getTopologyContext());
+    }
+
+    @Override
+    public void init(ExecutorContext context) {
+        this.executorContext = context;
+        this.executorsBolt = context.getExecutorsBolt();
+        this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+        initService(context);
+
+        // Side inputs setup
+        if (sideInputs != null && sideInputs.isEmpty() == false) {
+            pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+            watermarkHoldTag =
+                    StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+            pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+            sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+            runner = getDoFnRunner();
+            pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+        } else {
+            runner = getDoFnRunner();
+        }
+
+        // Process user's setup
+        doFnInvoker = DoFnInvokers.invokerFor(doFn);
+        doFnInvoker.invokeSetup();
+    }
+
+    @Override
+    public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+        LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+                tag, mainInputTag, sideInputs, elem.getValue()));
+        if (mainInputTag.equals(tag)) {
+            processMainInput(elem);
+        } else {
+            processSideInput(tag, elem);
+        }
+    }
+
+    protected <T> void processMainInput(WindowedValue<T> elem) {
+       if (sideInputs.isEmpty()) {
+           runner.processElement((WindowedValue<InputT>) elem);
+       } else {
+           Iterable<WindowedValue<InputT>> justPushedBack =
+               pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+           BagState<WindowedValue<InputT>> pushedBack =
+                   pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+           Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+           for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+               if (pushedBackValue.getTimestamp().isBefore(min)) {
+                   min = pushedBackValue.getTimestamp();
+               }
+               min = earlier(min, pushedBackValue.getTimestamp());
+               pushedBack.add(pushedBackValue);
+           }
+           pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
+       }
+    }
+
+    protected void processSideInput(TupleTag tag, WindowedValue elem) {
+        LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+
+        PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+        sideInputHandler.addSideInputValue(sideInputView, elem);
+
+        BagState<WindowedValue<InputT>> pushedBack =
+                pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+        List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+        Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+        if (pushedBackInputs != null) {
+            for (WindowedValue<InputT> input : pushedBackInputs) {
+
+                Iterable<WindowedValue<InputT>> justPushedBack =
+                        pushbackRunner.processElementInReadyWindows(input);
+                Iterables.addAll(newPushedBack, justPushedBack);
+            }
+        }
+        pushedBack.clear();
+
+        Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+        for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+            min = earlier(min, pushedBackValue.getTimestamp());
+            pushedBack.add(pushedBackValue);
+        }
+
+        WatermarkHoldState watermarkHold =
+                pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+        // TODO: clear-then-add is not thread-safe.
+        watermarkHold.clear();
+        watermarkHold.add(min);
+    }
+
+    /**
+     * Process all pushed back elements when receiving watermark with max timestamp
+     */
+    public void processAllPushBackElements() {
+        if (sideInputs != null && sideInputs.isEmpty() == false) {
+            BagState<WindowedValue<InputT>> pushedBackElements =
+                    pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+            if (pushedBackElements != null) {
+                for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+                    LOG.info("Process pushback elem={}", elem);
+                    runner.processElement(elem);
+                }
+                pushedBackElements.clear();
+            }
+
+            WatermarkHoldState watermarkHold =
+                    pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+            watermarkHold.clear();
+            watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
+        }
+    }
+
+    public void onTimer(Object key, TimerInternals.TimerData timerData) {
+        StateNamespace namespace = timerData.getNamespace();
+        checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+        BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+        if (pushbackRunner != null) {
+            pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+        } else {
+            runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        doFnInvoker.invokeTeardown();
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+
+    private Instant earlier(Instant left, Instant right) {
+        return left.isBefore(right) ? left : right;
+    }
+
+    public void startBundle() {
+        if (pushbackRunner != null) {
+            pushbackRunner.startBundle();
+        } else {
+            runner.startBundle();
+        }
+    }
+
+    public void finishBundle() {
+        if (pushbackRunner != null) {
+            pushbackRunner.finishBundle();
+        } else {
+            runner.finishBundle();
+        }
+    }
+
+    public void setInternalDoFnExecutorId(int id) {
+        this.internalDoFnExecutorId = id;
+    }
+
+    public int getInternalDoFnExecutorId() {
+        return internalDoFnExecutorId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
new file mode 100644
index 0000000..98dbcc5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * DoFnRunner decorator which registers {@link MetricsContainer}.
+ */
+public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+  private final String stepName;
+  private final DoFnRunner<InputT, OutputT> delegate;
+  private final MetricsReporter metricsReporter;
+
+  DoFnRunnerWithMetrics(
+      String stepName,
+      DoFnRunner<InputT, OutputT> delegate,
+      MetricsReporter metricsReporter) {
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
+  }
+
+  @Override
+  public void startBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.startBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.processElement(elem);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.onTimer(timerId, window, timestamp, timeDomain);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+        metricsReporter.getMetricsContainer(stepName))) {
+      delegate.finishBundle();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    metricsReporter.updateMetrics();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
new file mode 100644
index 0000000..d7214db
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+public interface Executor extends Serializable {
+    /**
+     * Initialization during runtime
+     */
+    void init(ExecutorContext context);
+
+    <T> void  process(TupleTag<T> tag, WindowedValue<T> elem);
+
+    void cleanup();
+}
\ No newline at end of file