You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:05 UTC
[05/53] [abbrv] beam git commit: jstorm-runner: rename the package to
org.apache.beam.runners.jstorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
new file mode 100644
index 0000000..aa7d325
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableListSerializer.java
@@ -0,0 +1,92 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.*;
+
+public class ImmutableListSerializer extends Serializer<ImmutableList<Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
+
+ public ImmutableListSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+ final int size = input.readInt(true);
+ final Object[] list = new Object[size];
+ for (int i = 0; i < size; ++i) {
+ list[i] = kryo.readClassAndObject(input);
+ }
+ return ImmutableList.copyOf(list);
+ }
+
+ /**
+ * Creates a new {@link ImmutableListSerializer} and registers its serializer
+ * for the several ImmutableList related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ // ImmutableList (abstract class)
+ // +- RegularImmutableList
+ // | RegularImmutableList
+ // +- SingletonImmutableList
+ // | Optimized for List with only 1 element.
+ // +- SubList
+ // | Representation for part of ImmutableList
+ // +- ReverseImmutableList
+ // | For iterating in reverse order
+ // +- StringAsImmutableList
+ // | Used by Lists#charactersOf
+ // +- Values (ImmutableTable values)
+ // Used by return value of #values() when there are multiple cells
+
+ config.registerSerialization(ImmutableList.class, ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.class), ImmutableListSerializer.class);
+
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
+
+ config.registerSerialization(ImmutableList.of().getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().getClass()), ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1).getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1).getClass()), ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of(1,2,3).subList(1, 2).getClass()), ImmutableListSerializer.class);
+ config.registerSerialization(ImmutableList.of().reverse().getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(ImmutableList.of().reverse().getClass()), ImmutableListSerializer.class);
+
+ config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(Lists.charactersOf("KryoRocks").getClass()), ImmutableListSerializer.class);
+
+ Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
+ baseTable.put(1, 2, 3);
+ baseTable.put(4, 5, 6);
+ Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+ config.registerSerialization(table.values().getClass(), ImmutableListSerializer.class);
+ config.registerSerialization(
+ RunnerUtils.getBeamSdkRepackClass(table.values().getClass()), ImmutableListSerializer.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
new file mode 100644
index 0000000..ee8b765
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableMapSerializer.java
@@ -0,0 +1,61 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImmutableMapSerializer extends Serializer<ImmutableMap<Object, ? extends Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = true;
+ private static final boolean IMMUTABLE = true;
+
+ public ImmutableMapSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableMap<Object, ? extends Object> immutableMap) {
+ kryo.writeObject(output, Maps.newHashMap(immutableMap));
+ }
+
+ @Override
+ public ImmutableMap<Object, Object> read(Kryo kryo, Input input, Class<ImmutableMap<Object, ? extends Object>> type) {
+ Map map = kryo.readObject(input, HashMap.class);
+ return ImmutableMap.copyOf(map);
+ }
+
+ /**
+ * Creates a new {@link ImmutableMapSerializer} and registers its serializer
+ * for the several ImmutableMap related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ config.registerSerialization(ImmutableMap.class, ImmutableMapSerializer.class);
+ config.registerSerialization(ImmutableMap.of().getClass(), ImmutableMapSerializer.class);
+
+ Object o1 = new Object();
+ Object o2 = new Object();
+
+ config.registerSerialization(ImmutableMap.of(o1, o1).getClass(), ImmutableMapSerializer.class);
+ config.registerSerialization(ImmutableMap.of(o1, o1, o2, o2).getClass(), ImmutableMapSerializer.class);
+ Map<DummyEnum,Object> enumMap = new EnumMap<DummyEnum, Object>(DummyEnum.class);
+ for (DummyEnum e : DummyEnum.values()) {
+ enumMap.put(e, o1);
+ }
+
+ config.registerSerialization(ImmutableMap.copyOf(enumMap).getClass(), ImmutableMapSerializer.class);
+ }
+
+ private enum DummyEnum {
+ VALUE1,
+ VALUE2
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
new file mode 100644
index 0000000..cdc4382
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/ImmutableSetSerializer.java
@@ -0,0 +1,71 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class ImmutableSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
+
+ public ImmutableSetSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+ final int size = input.readInt(true);
+ ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+ for (int i = 0; i < size; ++i) {
+ builder.add(kryo.readClassAndObject(input));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+ * for the several ImmutableSet related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ // ImmutableList (abstract class)
+ // +- EmptyImmutableSet
+ // | EmptyImmutableSet
+ // +- SingletonImmutableSet
+ // | Optimized for Set with only 1 element.
+ // +- RegularImmutableSet
+ // | RegularImmutableList
+ // +- EnumImmutableSet
+ // | EnumImmutableSet
+
+ config.registerSerialization(ImmutableSet.class, ImmutableSetSerializer.class);
+
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
+
+ config.registerSerialization(ImmutableSet.of().getClass(), ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1).getClass(), ImmutableSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), ImmutableSetSerializer.class);
+
+ config.registerSerialization(
+ Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), ImmutableSetSerializer.class);
+ }
+
+ private enum SomeEnum {
+ A, B, C
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
new file mode 100644
index 0000000..decfb3f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/KvStoreIterableSerializer.java
@@ -0,0 +1,55 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import com.alibaba.jstorm.cache.KvStoreIterable;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Lists;
+
+import java.util.Iterator;
+import java.util.List;
+
+public class KvStoreIterableSerializer extends Serializer<KvStoreIterable<Object>> {
+
+ public KvStoreIterableSerializer() {
+
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, KvStoreIterable<Object> object) {
+ List<Object> values = Lists.newArrayList(object);
+ output.writeInt(values.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public KvStoreIterable<Object> read(Kryo kryo, Input input, Class<KvStoreIterable<Object>> type) {
+ final int size = input.readInt(true);
+ List<Object> values = Lists.newArrayList();
+ for (int i = 0; i < size; ++i) {
+ values.add(kryo.readClassAndObject(input));
+ }
+
+ return new KvStoreIterable<Object>() {
+ Iterable<Object> values;
+
+ @Override
+ public Iterator<Object> iterator() {
+ return values.iterator();
+ }
+
+ public KvStoreIterable init(Iterable<Object> values) {
+ this.values = values;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return values.toString();
+ }
+ }.init(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
new file mode 100644
index 0000000..9bb315b
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuListSerializer.java
@@ -0,0 +1,78 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+
+public class SdkRepackImmuListSerializer extends Serializer<ImmutableList<Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
+
+ public SdkRepackImmuListSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableList<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public ImmutableList<Object> read(Kryo kryo, Input input, Class<ImmutableList<Object>> type) {
+ final int size = input.readInt(true);
+ final Object[] list = new Object[size];
+ for (int i = 0; i < size; ++i) {
+ list[i] = kryo.readClassAndObject(input);
+ }
+ return ImmutableList.copyOf(list);
+ }
+
+ /**
+ * Creates a new {@link ImmutableListSerializer} and registers its serializer
+ * for the several ImmutableList related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ // ImmutableList (abstract class)
+ // +- RegularImmutableList
+ // | RegularImmutableList
+ // +- SingletonImmutableList
+ // | Optimized for List with only 1 element.
+ // +- SubList
+ // | Representation for part of ImmutableList
+ // +- ReverseImmutableList
+ // | For iterating in reverse order
+ // +- StringAsImmutableList
+ // | Used by Lists#charactersOf
+ // +- Values (ImmutableTable values)
+ // Used by return value of #values() when there are multiple cells
+
+ config.registerSerialization(ImmutableList.class, SdkRepackImmuListSerializer.class);
+
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
+
+ config.registerSerialization(ImmutableList.of().getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1).getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.of(1,2,3).subList(1, 2).getClass(), SdkRepackImmuListSerializer.class);
+ config.registerSerialization(ImmutableList.of().reverse().getClass(), SdkRepackImmuListSerializer.class);
+
+ config.registerSerialization(Lists.charactersOf("KryoRocks").getClass(), SdkRepackImmuListSerializer.class);
+
+ Table<Integer,Integer,Integer> baseTable = HashBasedTable.create();
+ baseTable.put(1, 2, 3);
+ baseTable.put(4, 5, 6);
+ Table<Integer, Integer, Integer> table = ImmutableTable.copyOf(baseTable);
+ config.registerSerialization(table.values().getClass(), SdkRepackImmuListSerializer.class);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
new file mode 100644
index 0000000..a514645
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/SdkRepackImmuSetSerializer.java
@@ -0,0 +1,71 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+import org.apache.beam.sdk.repackaged.com.google.common.collect.*;
+
+public class SdkRepackImmuSetSerializer extends Serializer<ImmutableSet<Object>> {
+
+ private static final boolean DOES_NOT_ACCEPT_NULL = false;
+ private static final boolean IMMUTABLE = true;
+
+ public SdkRepackImmuSetSerializer() {
+ super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ImmutableSet<Object> object) {
+ output.writeInt(object.size(), true);
+ for (Object elm : object) {
+ kryo.writeClassAndObject(output, elm);
+ }
+ }
+
+ @Override
+ public ImmutableSet<Object> read(Kryo kryo, Input input, Class<ImmutableSet<Object>> type) {
+ final int size = input.readInt(true);
+ ImmutableSet.Builder<Object> builder = ImmutableSet.builder();
+ for (int i = 0; i < size; ++i) {
+ builder.add(kryo.readClassAndObject(input));
+ }
+ return builder.build();
+ }
+
+ /**
+ * Creates a new {@link ImmutableSetSerializer} and registers its serializer
+ * for the several ImmutableSet related classes.
+ */
+ public static void registerSerializers(Config config) {
+
+ // ImmutableList (abstract class)
+ // +- EmptyImmutableSet
+ // | EmptyImmutableSet
+ // +- SingletonImmutableSet
+ // | Optimized for Set with only 1 element.
+ // +- RegularImmutableSet
+ // | RegularImmutableList
+ // +- EnumImmutableSet
+ // | EnumImmutableSet
+
+ config.registerSerialization(ImmutableSet.class, SdkRepackImmuSetSerializer.class);
+
+ // Note:
+ // Only registering above is good enough for serializing/deserializing.
+ // but if using Kryo#copy, following is required.
+
+ config.registerSerialization(ImmutableSet.of().getClass(), SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1).getClass(), SdkRepackImmuSetSerializer.class);
+ config.registerSerialization(ImmutableSet.of(1,2,3).getClass(), SdkRepackImmuSetSerializer.class);
+
+ config.registerSerialization(
+ Sets.immutableEnumSet(SomeEnum.A, SomeEnum.B, SomeEnum.C).getClass(), SdkRepackImmuSetSerializer.class);
+ }
+
+ private enum SomeEnum {
+ A, B, C
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
new file mode 100644
index 0000000..c8b0138
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
@@ -0,0 +1,159 @@
+package org.apache.beam.runners.jstorm.serialization;
+
+import backtype.storm.Config;
+import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
+import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
+import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
+
+ private static final Field SOURCE_COLLECTION_FIELD;
+ private static final Field SOURCE_MAP_FIELD;
+
+ static {
+ try {
+ SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection" )
+ .getDeclaredField( "c" );
+ SOURCE_COLLECTION_FIELD.setAccessible( true );
+
+
+ SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap" )
+ .getDeclaredField( "m" );
+ SOURCE_MAP_FIELD.setAccessible( true );
+ } catch ( final Exception e ) {
+ throw new RuntimeException( "Could not access source collection" +
+ " field in java.util.Collections$UnmodifiableCollection.", e );
+ }
+ }
+
+ @Override
+ public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
+ final int ordinal = input.readInt( true );
+ final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
+ final Object sourceCollection = kryo.readClassAndObject( input );
+ return unmodifiableCollection.create( sourceCollection );
+ }
+
+ @Override
+ public void write(final Kryo kryo, final Output output, final Object object) {
+ try {
+ final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( object.getClass() );
+ // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
+ output.writeInt( unmodifiableCollection.ordinal(), true );
+ kryo.writeClassAndObject( output, unmodifiableCollection.sourceCollectionField.get( object ) );
+ } catch ( final RuntimeException e ) {
+ // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
+ // handles SerializationException specifically (resizing the buffer)...
+ throw e;
+ } catch ( final Exception e ) {
+ throw new RuntimeException( e );
+ }
+ }
+
+ @Override
+ public Object copy(Kryo kryo, Object original) {
+ try {
+ final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.valueOfType( original.getClass() );
+ Object sourceCollectionCopy = kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
+ return unmodifiableCollection.create( sourceCollectionCopy );
+ } catch ( final RuntimeException e ) {
+ // Don't eat and wrap RuntimeExceptions
+ throw e;
+ } catch ( final Exception e ) {
+ throw new RuntimeException( e );
+ }
+ }
+
+ private static enum UnmodifiableCollection {
+ COLLECTION( Collections.unmodifiableCollection( Arrays.asList( "" ) ).getClass(), SOURCE_COLLECTION_FIELD ){
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableCollection( (Collection<?>) sourceCollection );
+ }
+ },
+ RANDOM_ACCESS_LIST( Collections.unmodifiableList( new ArrayList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableList( (List<?>) sourceCollection );
+ }
+ },
+ LIST( Collections.unmodifiableList( new LinkedList<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableList( (List<?>) sourceCollection );
+ }
+ },
+ SET( Collections.unmodifiableSet( new HashSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableSet( (Set<?>) sourceCollection );
+ }
+ },
+ SORTED_SET( Collections.unmodifiableSortedSet( new TreeSet<Void>() ).getClass(), SOURCE_COLLECTION_FIELD ){
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableSortedSet( (SortedSet<?>) sourceCollection );
+ }
+ },
+ MAP( Collections.unmodifiableMap( new HashMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
+
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableMap( (Map<?, ?>) sourceCollection );
+ }
+
+ },
+ SORTED_MAP( Collections.unmodifiableSortedMap( new TreeMap<Void, Void>() ).getClass(), SOURCE_MAP_FIELD ) {
+ @Override
+ public Object create( final Object sourceCollection ) {
+ return Collections.unmodifiableSortedMap( (SortedMap<?, ?>) sourceCollection );
+ }
+ };
+
+ private final Class<?> type;
+ private final Field sourceCollectionField;
+
+ private UnmodifiableCollection( final Class<?> type, final Field sourceCollectionField ) {
+ this.type = type;
+ this.sourceCollectionField = sourceCollectionField;
+ }
+
+ /**
+ * @param sourceCollection
+ */
+ public abstract Object create( Object sourceCollection );
+
+ static UnmodifiableCollection valueOfType(final Class<?> type ) {
+ for( final UnmodifiableCollection item : values() ) {
+ if ( item.type.equals( type ) ) {
+ return item;
+ }
+ }
+ throw new IllegalArgumentException( "The type " + type + " is not supported." );
+ }
+
+ }
+
+ /**
+ * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
+ * for the several unmodifiable Collections that can be created via {@link Collections},
+ * including {@link Map}s.
+ *
+ * @see Collections#unmodifiableCollection(Collection)
+ * @see Collections#unmodifiableList(List)
+ * @see Collections#unmodifiableSet(Set)
+ * @see Collections#unmodifiableSortedSet(SortedSet)
+ * @see Collections#unmodifiableMap(Map)
+ * @see Collections#unmodifiableSortedMap(SortedMap)
+ */
+ public static void registerSerializers( Config config ) {
+ UnmodifiableCollection.values();
+ for ( final UnmodifiableCollection item : UnmodifiableCollection.values() ) {
+ config.registerSerialization( item.type, UnmodifiableCollectionsSerializer.class );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
new file mode 100644
index 0000000..d907fac
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StormPipelineTranslator.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.util.InstanceBuilder;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+
+import java.util.List;
+
+/**
+ * Pipleline translator of Storm
+ */
+public class StormPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
+ private static final Logger LOG = LoggerFactory.getLogger(StormPipelineTranslator.class);
+ private TranslationContext context;
+ private int depth = 0;
+
+ public StormPipelineTranslator(TranslationContext context) {
+ this.context = context;
+ }
+
+ public void translate(Pipeline pipeline) {
+ List<PTransformOverride> transformOverrides =
+ ImmutableList.<PTransformOverride>builder()
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsIterable.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsList.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsList.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsMultimap.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsMultimap.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new ReflectiveOneToOneOverrideFactory(ViewTranslator.ViewAsSingleton.class)))
+ .add(PTransformOverride.of(PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new ReflectiveOneToOneOverrideFactory((ViewTranslator.CombineGloballyAsSingletonView.class))))
+ .build();
+ pipeline.replaceAll(transformOverrides);
+ pipeline.traverseTopologically(this);
+ }
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + node);
+ this.depth++;
+
+ // check if current composite transforms need to be translated.
+ // If not, all sub transforms will be translated in visitPrimitiveTransform.
+ PTransform<?, ?> transform = node.getTransform();
+ if (transform != null) {
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+
+ if (translator != null && applyCanTranslate(transform, node, translator)) {
+ applyStreamingTransform(transform, node, translator);
+ LOG.info(genSpaces(this.depth) + "translated-" + node);
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+
+ public void leaveCompositeTransform(TransformHierarchy.Node node) {
+ this.depth--;
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + node);
+ }
+
+ public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + node);
+
+ if (!node.isRootNode()) {
+ PTransform<?, ?> transform = node.getTransform();
+ TransformTranslator translator = TranslatorRegistry.getTranslator(transform);
+ if (translator == null || !applyCanTranslate(transform, node, translator)) {
+ LOG.info(node.getTransform().getClass().toString());
+ throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
+ }
+ applyStreamingTransform(transform, node, translator);
+ }
+ }
+
+ public void visitValue(PValue value, TransformHierarchy.Node node) {
+ LOG.info(genSpaces(this.depth) + "visiting value {}", value);
+ }
+
+ private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node,
+ TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+ typedTranslator.translateNode(typedTransform, context);
+
+ // Maintain PValue to TupleTag map for side inputs translation.
+ context.getUserGraphContext().recordOutputTaggedPValue();
+ }
+
+ private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, TransformTranslator<?> translator) {
+ @SuppressWarnings("unchecked")
+ T typedTransform = (T) transform;
+ @SuppressWarnings("unchecked")
+ TransformTranslator<T> typedTranslator = (TransformTranslator<T>) translator;
+
+ context.getUserGraphContext().setCurrentTransform(node.toAppliedPTransform());
+
+ return typedTranslator.canTranslate(typedTransform, context);
+ }
+
+ /**
+ * Utility formatting method.
+ *
+ * @param n number of spaces to generate
+ * @return String with "|" followed by n spaces
+ */
+ protected static String genSpaces(int n) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < n; i++) {
+ builder.append("| ");
+ }
+ return builder.toString();
+ }
+
+ private static class ReflectiveOneToOneOverrideFactory<
+ InputT extends PValue,
+ OutputT extends PValue,
+ TransformT extends PTransform<InputT, OutputT>>
+ extends SingleInputOutputOverrideFactory<InputT, OutputT, TransformT> {
+ private final Class<PTransform<InputT, OutputT>> replacement;
+
+ private ReflectiveOneToOneOverrideFactory(
+ Class<PTransform<InputT, OutputT>> replacement) {
+ this.replacement = replacement;
+ }
+
+ @Override
+ public PTransformReplacement<InputT, OutputT> getReplacementTransform(AppliedPTransform<InputT, OutputT, TransformT> appliedPTransform) {
+ PTransform<InputT, OutputT> originalPTransform = appliedPTransform.getTransform();
+ PTransform<InputT, OutputT> replacedPTransform = InstanceBuilder.ofType(replacement)
+ .withArg((Class<PTransform<InputT, OutputT>>) originalPTransform.getClass(), originalPTransform)
+ .build();
+ InputT inputT = (InputT) Iterables.getOnlyElement(appliedPTransform.getInputs().values());
+ return PTransformReplacement.of(inputT, replacedPTransform);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
new file mode 100644
index 0000000..bf4515f
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import avro.shaded.com.google.common.collect.Lists;
+import org.apache.beam.runners.jstorm.translation.translator.Stream;
+import org.apache.beam.runners.jstorm.util.RunnerUtils;
+import com.google.common.base.Strings;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.runtime.AdaptorBasicSpout;
+import org.apache.beam.runners.jstorm.translation.runtime.Executor;
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.PValueBase;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.translation.runtime.ExecutorsBolt;
+
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Maintains the state necessary during Pipeline translation to build a Storm topology.
+ */
+public class TranslationContext {
+ private static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
+
+ private final UserGraphContext userGraphContext;
+ private final ExecutionGraphContext executionGraphContext;
+
+ public TranslationContext(StormPipelineOptions options) {
+ this.userGraphContext = new UserGraphContext(options);
+ this.executionGraphContext = new ExecutionGraphContext();
+ }
+
+ public ExecutionGraphContext getExecutionGraphContext() {
+ return executionGraphContext;
+ }
+
+ public UserGraphContext getUserGraphContext() {
+ return userGraphContext;
+ }
+
+ private void addStormStreamDef(TaggedPValue input, String destComponentName, Stream.Grouping grouping) {
+ Stream.Producer producer = executionGraphContext.getProducer(input.getValue());
+ if (!producer.getComponentId().equals(destComponentName)) {
+ Stream.Consumer consumer = Stream.Consumer.of(destComponentName, grouping);
+ executionGraphContext.registerStreamConsumer(consumer, producer);
+
+ ExecutorsBolt executorsBolt = executionGraphContext.getBolt(producer.getComponentId());
+ if (executorsBolt != null) {
+ executorsBolt.addExternalOutputTag(input.getTag());
+ }
+ }
+ }
+
+ private String getUpstreamExecutorsBolt() {
+ for (PValue value : userGraphContext.getInputs().values()) {
+ String componentId = executionGraphContext.getProducerComponentId(value);
+ if (componentId != null && executionGraphContext.getBolt(componentId) != null) {
+ return componentId;
+ }
+ }
+ // When upstream component is spout, "null" will be return.
+ return null;
+ }
+
+ /**
+ * check if the current transform is applied to source collection.
+ * @return
+ */
+ private boolean connectedToSource() {
+ for (PValue value : userGraphContext.getInputs().values()) {
+ if (executionGraphContext.producedBySpout(value)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param upstreamExecutorsBolt
+ * @return true if there is multiple input streams, or upstream executor output the same stream
+ * to different executors
+ */
+ private boolean isMultipleInputOrOutput(ExecutorsBolt upstreamExecutorsBolt, Map<TupleTag<?>, PValue> inputs) {
+ if (inputs.size() > 1) {
+ return true;
+ } else {
+ final Sets.SetView<TupleTag> intersection = Sets.intersection(upstreamExecutorsBolt.getExecutors().keySet(), inputs.keySet());
+ if (!intersection.isEmpty()) {
+ // there is already a different executor consume the same input
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public void addTransformExecutor(Executor executor) {
+ addTransformExecutor(executor, Collections.EMPTY_LIST);
+ }
+
+ public void addTransformExecutor(Executor executor, List<PValue> sideInputs) {
+ addTransformExecutor(executor, userGraphContext.getInputs(), userGraphContext.getOutputs(), sideInputs);
+ }
+
+ public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs) {
+ addTransformExecutor(executor, inputs, outputs, Collections.EMPTY_LIST);
+ }
+
+ public void addTransformExecutor(Executor executor, Map<TupleTag<?>, PValue> inputs, Map<TupleTag<?>, PValue> outputs, List<PValue> sideInputs) {
+ String name = null;
+
+ ExecutorsBolt bolt = null;
+
+ boolean isGBK = false;
+ /**
+ * Check if the transform executor needs to be chained into an existing ExecutorsBolt.
+ * For following cases, a new bolt is created for the specified executor, otherwise the executor
+ * will be added into the bolt contains corresponding upstream executor.
+ * a) it is a GroupByKey executor
+ * b) it is connected to source directly
+ * c) None existing upstream bolt was found
+ * d) For the purpose of performance to reduce the side effects between multiple streams which
+ * is output to same executor, a new bolt will be created.
+ */
+ if (RunnerUtils.isGroupByKeyExecutor(executor)) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ isGBK = true;
+ } else if (connectedToSource()) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ } else {
+ name = getUpstreamExecutorsBolt();
+ if (name == null) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ } else {
+ bolt = executionGraphContext.getBolt(name);
+ if (isMultipleInputOrOutput(bolt, inputs)) {
+ bolt = new ExecutorsBolt();
+ name = executionGraphContext.registerBolt(bolt);
+ }
+ }
+ }
+
+ // update the output tags of current transform into ExecutorsBolt
+ for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
+ TupleTag tag = entry.getKey();
+ PValue value = entry.getValue();
+
+ // use tag of PValueBase
+ if (value instanceof PValueBase) {
+ tag = ((PValueBase) value).expand().keySet().iterator().next();
+ }
+ executionGraphContext.registerStreamProducer(
+ TaggedPValue.of(tag, value),
+ Stream.Producer.of(name, tag.getId(), value.getName()));
+ //bolt.addOutputTags(tag);
+ }
+
+ // add the transform executor into the chain of ExecutorsBolt
+ for (Map.Entry<TupleTag<?>, PValue> entry : inputs.entrySet()) {
+ TupleTag tag = entry.getKey();
+ PValue value = entry.getValue();
+ bolt.addExecutor(tag, executor);
+
+ // filter all connections inside bolt
+ //if (!bolt.getOutputTags().contains(tag)) {
+ Stream.Grouping grouping;
+ if (isGBK) {
+ grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
+ } else {
+ grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
+ }
+ addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
+ //}
+ }
+
+ for (PValue sideInput : sideInputs) {
+ TupleTag tag = userGraphContext.findTupleTag(sideInput);
+ bolt.addExecutor(tag, executor);
+ checkState(!bolt.getOutputTags().contains(tag));
+ addStormStreamDef(TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
+ }
+
+ bolt.registerExecutor(executor);
+
+ // set parallelismNumber
+ String pTransformfullName = userGraphContext.currentTransform.getFullName();
+ String compositeName = pTransformfullName.split("/")[0];
+ Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+ if (parallelismNumMap.containsKey(compositeName)) {
+ int configNum = (Integer) parallelismNumMap.get(compositeName);
+ int currNum = bolt.getParallelismNum();
+ bolt.setParallelismNum(Math.max(configNum, currNum));
+ }
+ }
+
+ // TODO: add getSideInputs() and getSideOutputs().
+ public static class UserGraphContext {
+ private final StormPipelineOptions options;
+ private final Map<PValue, TupleTag> pValueToTupleTag;
+ private AppliedPTransform<?, ?, ?> currentTransform = null;
+
+ private boolean isWindowed = false;
+
+ public UserGraphContext(StormPipelineOptions options) {
+ this.options = checkNotNull(options, "options");
+ this.pValueToTupleTag = Maps.newHashMap();
+ }
+
+ public StormPipelineOptions getOptions() {
+ return this.options;
+ }
+
+ public void setCurrentTransform(AppliedPTransform<?, ?, ?> transform) {
+ this.currentTransform = transform;
+ }
+
+ public String getStepName() {
+ return currentTransform.getFullName();
+ }
+
+ public <T extends PValue> T getInput() {
+ return (T) currentTransform.getInputs().values().iterator().next();
+ }
+
+ public Map<TupleTag<?>, PValue> getInputs() {
+ return currentTransform.getInputs();
+ }
+
+ public TupleTag<?> getInputTag() {
+ return currentTransform.getInputs().keySet().iterator().next();
+ }
+
+ public List<TupleTag<?>> getInputTags() {
+ return Lists.newArrayList(currentTransform.getInputs().keySet());
+ }
+
+ public <T extends PValue> T getOutput() {
+ return (T) currentTransform.getOutputs().values().iterator().next();
+ }
+
+ public Map<TupleTag<?>, PValue> getOutputs() {
+ return currentTransform.getOutputs();
+ }
+
+ public TupleTag<?> getOutputTag() {
+ return currentTransform.getOutputs().keySet().iterator().next();
+ }
+
+ public List<TupleTag<?>> getOutputTags() {
+ return Lists.newArrayList(currentTransform.getOutputs().keySet());
+ }
+
+ public void recordOutputTaggedPValue() {
+ for (Map.Entry<TupleTag<?>, PValue> entry : getOutputs().entrySet()) {
+ pValueToTupleTag.put(entry.getValue(), entry.getKey());
+ }
+ }
+
+ public <T> TupleTag<T> findTupleTag(PValue pValue) {
+ return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
+ }
+
+ public void setWindowed() {
+ this.isWindowed = true;
+ }
+
+ public boolean isWindowed() {
+ return this.isWindowed;
+ }
+
+ @Override
+ public String toString() {
+ return Joiner.on('\n').join(FluentIterable.from(pValueToTupleTag.entrySet())
+ .transform(new Function<Map.Entry<PValue,TupleTag>, String>() {
+ @Override
+ public String apply(Map.Entry<PValue, TupleTag> entry) {
+ return String.format("%s == %s", entry.getValue().getId(), entry.getKey().getName());
+ }}));
+ }
+ }
+
+ public static class ExecutionGraphContext {
+
+ private final Map<String, AdaptorBasicSpout> spoutMap = new HashMap<>();
+ private final Map<String, ExecutorsBolt> boltMap = new HashMap<>();
+
+ // One-to-one mapping between Stream.Producer and TaggedPValue (or PValue).
+ private final Map<PValue, Stream.Producer> pValueToProducer = new HashMap<>();
+ private final Map<Stream.Producer, TaggedPValue> producerToTaggedPValue = new HashMap<>();
+
+ private final List<Stream> streams = new ArrayList<>();
+
+ private int id = 1;
+
+ public void registerSpout(AdaptorBasicSpout spout, TaggedPValue output) {
+ checkNotNull(spout, "spout");
+ checkNotNull(output, "output");
+ String name = "spout" + genId();
+ this.spoutMap.put(name, spout);
+ registerStreamProducer(
+ output,
+ Stream.Producer.of(name, output.getTag().getId(), output.getValue().getName()));
+ }
+
+ public AdaptorBasicSpout getSpout(String id) {
+ if (Strings.isNullOrEmpty(id)) {
+ return null;
+ }
+ return this.spoutMap.get(id);
+ }
+
+ public Map<String, AdaptorBasicSpout> getSpouts() {
+ return this.spoutMap;
+ }
+
+ public String registerBolt(ExecutorsBolt bolt) {
+ checkNotNull(bolt, "bolt");
+ String name = "bolt" + genId();
+ this.boltMap.put(name, bolt);
+ return name;
+ }
+
+ public ExecutorsBolt getBolt(String id) {
+ if (Strings.isNullOrEmpty(id)) {
+ return null;
+ }
+ return this.boltMap.get(id);
+ }
+
+ public void registerStreamProducer(TaggedPValue taggedPValue, Stream.Producer producer) {
+ checkNotNull(taggedPValue, "taggedPValue");
+ checkNotNull(producer, "producer");
+ pValueToProducer.put(taggedPValue.getValue(), producer);
+ producerToTaggedPValue.put(producer, taggedPValue);
+ }
+
+ public Stream.Producer getProducer(PValue pValue) {
+ return pValueToProducer.get(checkNotNull(pValue, "pValue"));
+ }
+
+ public String getProducerComponentId(PValue pValue) {
+ Stream.Producer producer = getProducer(pValue);
+ return producer == null ? null : producer.getComponentId();
+ }
+
+ public boolean producedBySpout(PValue pValue) {
+ String componentId = getProducerComponentId(pValue);
+ return getSpout(componentId) != null;
+ }
+
+ public void registerStreamConsumer(Stream.Consumer consumer, Stream.Producer producer) {
+ streams.add(Stream.of(
+ checkNotNull(producer, "producer"),
+ checkNotNull(consumer, "consumer")));
+ }
+
+ public Map<PValue, Stream.Producer> getPValueToProducers() {
+ return pValueToProducer;
+ }
+
+ public Iterable<Stream> getStreams() {
+ return streams;
+ }
+
+ @Override
+ public String toString() {
+ List<String> ret = new ArrayList<>();
+ ret.add("SPOUT");
+ for (Map.Entry<String, AdaptorBasicSpout> entry : spoutMap.entrySet()) {
+ ret.add(entry.getKey() + ": " + entry.getValue().toString());
+ }
+ ret.add("BOLT");
+ for (Map.Entry<String, ExecutorsBolt> entry : boltMap.entrySet()) {
+ ret.add(entry.getKey() + ": " + entry.getValue().toString());
+ }
+ ret.add("STREAM");
+ for (Stream stream : streams) {
+ ret.add(String.format(
+ "%s@@%s ---> %s@@%s",
+ stream.getProducer().getStreamId(),
+ stream.getProducer().getComponentId(),
+ stream.getConsumer().getGrouping(),
+ stream.getConsumer().getComponentId()));
+ }
+ return Joiner.on("\n").join(ret);
+ }
+
+ private synchronized int genId() {
+ return id++;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
new file mode 100644
index 0000000..0f856cf
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation;
+
+import com.alibaba.jstorm.beam.translation.translator.*;
+import org.apache.beam.runners.jstorm.translation.translator.BoundedSourceTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.FlattenTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.GroupByKeyTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundMultiTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ParDoBoundTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.TransformTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.UnboundedSourceTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.ViewTranslator;
+import org.apache.beam.runners.jstorm.translation.translator.WindowAssignTranslator;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Lookup table mapping PTransform types to associated TransformTranslator implementations.
+ */
+public class TranslatorRegistry {
+ private static final Logger LOG = LoggerFactory.getLogger(TranslatorRegistry.class);
+
+ private static final Map<Class<? extends PTransform>, TransformTranslator> TRANSLATORS = new HashMap<>();
+
+ static {
+ TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
+ TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
+ // TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+ // TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
+
+ TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
+ TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
+
+ //TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator<>());
+ TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
+
+ TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
+
+ TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+
+ TRANSLATORS.put(ViewTranslator.CreateJStormPCollectionView.class, new ViewTranslator());
+
+ /**
+ * Currently, empty translation is required for combine and reshuffle. Because, the transforms will be
+ * mapped to GroupByKey and Pardo finally. So we only need to translator the finally transforms.
+ * If any improvement is required, the composite transforms will be translated in the future.
+ */
+ // TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+ // TRANSLATORS.put(Globally.class, new CombineGloballyTranslator());
+ // TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslator());
+ }
+
+ public static TransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
+ TransformTranslator<?> translator = TRANSLATORS.get(transform.getClass());
+ if (translator == null) {
+ LOG.warn("Unsupported operator={}", transform.getClass().getName());
+ }
+ return translator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
new file mode 100644
index 0000000..b07b426
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AbstractComponent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.runners.jstorm.translation.util.CommonInstance;
+
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/*
+ * Enable user to add output stream definitions by API, rather than hard-code.
+ */
+public abstract class AbstractComponent implements IComponent {
+ private Map<String, Fields> streamToFields = new HashMap<>();
+ private Map<String, Boolean> keyStreams = new HashMap<>();
+ private int parallelismNum = 0;
+
+ public void addOutputField(String streamId) {
+ addOutputField(streamId, new Fields(CommonInstance.VALUE));
+ }
+
+ public void addOutputField(String streamId, Fields fields) {
+ streamToFields.put(streamId, fields);
+ keyStreams.put(streamId, false);
+ }
+
+ public void addKVOutputField(String streamId) {
+ streamToFields.put(streamId, new Fields(CommonInstance.KEY, CommonInstance.VALUE));
+ keyStreams.put(streamId, true);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ for (Map.Entry<String, Fields> entry : streamToFields.entrySet()) {
+ declarer.declareStream(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public boolean keyedEmit(String streamId) {
+ Boolean isKeyedStream = keyStreams.get(streamId);
+ return isKeyedStream == null ? false : isKeyedStream;
+ }
+
+ public int getParallelismNum() {
+ return parallelismNum;
+ }
+
+ public void setParallelismNum(int num) {
+ parallelismNum = num;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
new file mode 100644
index 0000000..91881f2
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicBolt.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.topology.IRichBatchBolt;
+
+public abstract class AdaptorBasicBolt extends AbstractComponent implements IRichBatchBolt {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
new file mode 100644
index 0000000..5a0c6ec
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/AdaptorBasicSpout.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import backtype.storm.topology.IRichSpout;
+
+public abstract class AdaptorBasicSpout extends AbstractComponent implements IRichSpout {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
new file mode 100644
index 0000000..2bf3303
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnExecutor.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+import java.util.*;
+
+import avro.shaded.com.google.common.collect.Iterables;
+import org.apache.beam.runners.jstorm.translation.runtime.state.JStormStateInternals;
+import org.apache.beam.runners.jstorm.translation.runtime.timer.JStormTimerInternals;
+
+import com.alibaba.jstorm.cache.IKvStoreManager;
+import com.alibaba.jstorm.metric.MetricClient;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.beam.runners.jstorm.StormPipelineOptions;
+import org.apache.beam.runners.jstorm.translation.util.DefaultStepContext;
+import org.apache.beam.runners.jstorm.util.SerializedPipelineOptions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class DoFnExecutor<InputT, OutputT> implements Executor {
+ private static final long serialVersionUID = 5297603063991078668L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(DoFnExecutor.class);
+
+ public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+ private static final long serialVersionUID = -661113364735206170L;
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ executorsBolt.processExecutorElem(tag, output);
+ }
+ }
+
+ protected transient DoFnRunner<InputT, OutputT> runner = null;
+ protected transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackRunner = null;
+
+ protected final String stepName;
+
+ protected int internalDoFnExecutorId;
+
+ protected final String description;
+
+ protected final TupleTag<OutputT> mainTupleTag;
+ protected final List<TupleTag<?>> sideOutputTags;
+
+ protected SerializedPipelineOptions serializedOptions;
+ protected transient StormPipelineOptions pipelineOptions;
+
+ protected DoFn<InputT, OutputT> doFn;
+ protected final Coder<WindowedValue<InputT>> inputCoder;
+ protected DoFnInvoker<InputT, OutputT> doFnInvoker;
+ protected OutputManager outputManager;
+ protected WindowingStrategy<?, ?> windowingStrategy;
+ protected final TupleTag<InputT> mainInputTag;
+ protected Collection<PCollectionView<?>> sideInputs;
+ protected SideInputHandler sideInputHandler;
+ protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
+
+ // Initialize during runtime
+ protected ExecutorContext executorContext;
+ protected ExecutorsBolt executorsBolt;
+ protected TimerInternals timerInternals;
+ protected transient StateInternals pushbackStateInternals;
+ protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
+ protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
+ protected transient IKvStoreManager kvStoreManager;
+ protected DefaultStepContext stepContext;
+ protected transient MetricClient metricClient;
+
+ public DoFnExecutor(
+ String stepName,
+ String description,
+ StormPipelineOptions pipelineOptions,
+ DoFn<InputT, OutputT> doFn,
+ Coder<WindowedValue<InputT>> inputCoder,
+ WindowingStrategy<?, ?> windowingStrategy,
+ TupleTag<InputT> mainInputTag,
+ Collection<PCollectionView<?>> sideInputs,
+ Map<TupleTag, PCollectionView<?>> sideInputTagToView,
+ TupleTag<OutputT> mainTupleTag,
+ List<TupleTag<?>> sideOutputTags) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.description = checkNotNull(description, "description");
+ this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+ this.doFn = doFn;
+ this.inputCoder = inputCoder;
+ this.outputManager = new DoFnExecutorOutputManager();
+ this.windowingStrategy = windowingStrategy;
+ this.mainInputTag = mainInputTag;
+ this.sideInputs = sideInputs;
+ this.mainTupleTag = mainTupleTag;
+ this.sideOutputTags = sideOutputTags;
+ this.sideInputTagToView = sideInputTagToView;
+ }
+
+ protected DoFnRunner<InputT, OutputT> getDoFnRunner() {
+ return new DoFnRunnerWithMetrics<>(
+ stepName,
+ DoFnRunners.simpleRunner(
+ this.pipelineOptions,
+ this.doFn,
+ this.sideInputHandler == null ? NullSideInputReader.empty() : sideInputHandler,
+ this.outputManager,
+ this.mainTupleTag,
+ this.sideOutputTags,
+ this.stepContext,
+ this.windowingStrategy),
+ MetricsReporter.create(metricClient));
+ }
+
+ protected void initService(ExecutorContext context) {
+ // TODO: what should be set for key in here?
+ timerInternals = new JStormTimerInternals(null /* key */, this, context.getExecutorsBolt().timerService());
+ kvStoreManager = context.getKvStoreManager();
+ stepContext = new DefaultStepContext(timerInternals,
+ new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId));
+ metricClient = new MetricClient(executorContext.getTopologyContext());
+ }
+
+ @Override
+ public void init(ExecutorContext context) {
+ this.executorContext = context;
+ this.executorsBolt = context.getExecutorsBolt();
+ this.pipelineOptions = this.serializedOptions.getPipelineOptions().as(StormPipelineOptions.class);
+
+ initService(context);
+
+ // Side inputs setup
+ if (sideInputs != null && sideInputs.isEmpty() == false) {
+ pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
+ watermarkHoldTag =
+ StateTags.watermarkStateInternal("hold", TimestampCombiner.EARLIEST);
+ pushbackStateInternals = new JStormStateInternals(null, kvStoreManager, executorsBolt.timerService(), internalDoFnExecutorId);
+ sideInputHandler = new SideInputHandler(sideInputs, pushbackStateInternals);
+ runner = getDoFnRunner();
+ pushbackRunner = SimplePushbackSideInputDoFnRunner.create(runner, sideInputs, sideInputHandler);
+ } else {
+ runner = getDoFnRunner();
+ }
+
+ // Process user's setup
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+ doFnInvoker.invokeSetup();
+ }
+
+ @Override
+ public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
+ LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
+ tag, mainInputTag, sideInputs, elem.getValue()));
+ if (mainInputTag.equals(tag)) {
+ processMainInput(elem);
+ } else {
+ processSideInput(tag, elem);
+ }
+ }
+
+ protected <T> void processMainInput(WindowedValue<T> elem) {
+ if (sideInputs.isEmpty()) {
+ runner.processElement((WindowedValue<InputT>) elem);
+ } else {
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows((WindowedValue<InputT>) elem);
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+ if (pushedBackValue.getTimestamp().isBefore(min)) {
+ min = pushedBackValue.getTimestamp();
+ }
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
+ }
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag).add(min);
+ }
+ }
+
+ protected void processSideInput(TupleTag tag, WindowedValue elem) {
+ LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+
+ PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
+ sideInputHandler.addSideInputValue(sideInputView, elem);
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+
+ Iterable<WindowedValue<InputT>> pushedBackInputs = pushedBack.read();
+ if (pushedBackInputs != null) {
+ for (WindowedValue<InputT> input : pushedBackInputs) {
+
+ Iterable<WindowedValue<InputT>> justPushedBack =
+ pushbackRunner.processElementInReadyWindows(input);
+ Iterables.addAll(newPushedBack, justPushedBack);
+ }
+ }
+ pushedBack.clear();
+
+ Instant min = BoundedWindow.TIMESTAMP_MAX_VALUE;
+ for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+ min = earlier(min, pushedBackValue.getTimestamp());
+ pushedBack.add(pushedBackValue);
+ }
+
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ // TODO: clear-then-add is not thread-safe.
+ watermarkHold.clear();
+ watermarkHold.add(min);
+ }
+
+ /**
+ * Process all pushed back elements when receiving watermark with max timestamp
+ */
+ public void processAllPushBackElements() {
+ if (sideInputs != null && sideInputs.isEmpty() == false) {
+ BagState<WindowedValue<InputT>> pushedBackElements =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ if (pushedBackElements != null) {
+ for (WindowedValue<InputT> elem : pushedBackElements.read()) {
+ LOG.info("Process pushback elem={}", elem);
+ runner.processElement(elem);
+ }
+ pushedBackElements.clear();
+ }
+
+ WatermarkHoldState watermarkHold =
+ pushbackStateInternals.state(StateNamespaces.global(), watermarkHoldTag);
+ watermarkHold.clear();
+ watermarkHold.add(BoundedWindow.TIMESTAMP_MAX_VALUE);
+ }
+ }
+
+ public void onTimer(Object key, TimerInternals.TimerData timerData) {
+ StateNamespace namespace = timerData.getNamespace();
+ checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+ BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+ if (pushbackRunner != null) {
+ pushbackRunner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ } else {
+ runner.onTimer(timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ doFnInvoker.invokeTeardown();
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ private Instant earlier(Instant left, Instant right) {
+ return left.isBefore(right) ? left : right;
+ }
+
+ public void startBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.startBundle();
+ } else {
+ runner.startBundle();
+ }
+ }
+
+ public void finishBundle() {
+ if (pushbackRunner != null) {
+ pushbackRunner.finishBundle();
+ } else {
+ runner.finishBundle();
+ }
+ }
+
+ public void setInternalDoFnExecutorId(int id) {
+ this.internalDoFnExecutorId = id;
+ }
+
+ public int getInternalDoFnExecutorId() {
+ return internalDoFnExecutorId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
new file mode 100644
index 0000000..98dbcc5
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/DoFnRunnerWithMetrics.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+
+/**
+ * DoFnRunner decorator which registers {@link MetricsContainer}.
+ */
+public class DoFnRunnerWithMetrics<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+
+ private final String stepName;
+ private final DoFnRunner<InputT, OutputT> delegate;
+ private final MetricsReporter metricsReporter;
+
+ DoFnRunnerWithMetrics(
+ String stepName,
+ DoFnRunner<InputT, OutputT> delegate,
+ MetricsReporter metricsReporter) {
+ this.stepName = checkNotNull(stepName, "stepName");
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.metricsReporter = checkNotNull(metricsReporter, "metricsReporter");
+ }
+
+ @Override
+ public void startBundle() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.startBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.processElement(elem);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.onTimer(timerId, window, timestamp, timeDomain);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishBundle() {
+ try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(
+ metricsReporter.getMetricsContainer(stepName))) {
+ delegate.finishBundle();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ metricsReporter.updateMetrics();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/aa654b3f/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
new file mode 100644
index 0000000..d7214db
--- /dev/null
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/runtime/Executor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jstorm.translation.runtime;
+
+import java.io.Serializable;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+public interface Executor extends Serializable {
+ /**
+ * Initialization during runtime
+ */
+ void init(ExecutorContext context);
+
+ <T> void process(TupleTag<T> tag, WindowedValue<T> elem);
+
+ void cleanup();
+}
\ No newline at end of file