You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:46 UTC

[46/53] [abbrv] beam git commit: jstorm-runner: 1. Generate execution DAG for runtime 2. Restructure Kryo serializers

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
deleted file mode 100644
index 615ac8b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Specific serializer of {@link Kryo} for Unmodifiable Collection.
- */
-public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
-
-  private static final Field SOURCE_COLLECTION_FIELD;
-  private static final Field SOURCE_MAP_FIELD;
-
-  static {
-    try {
-      SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection")
-          .getDeclaredField("c");
-      SOURCE_COLLECTION_FIELD.setAccessible(true);
-
-
-      SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap")
-          .getDeclaredField("m");
-      SOURCE_MAP_FIELD.setAccessible(true);
-    } catch (final Exception e) {
-      throw new RuntimeException("Could not access source collection"
-          + " field in java.util.Collections$UnmodifiableCollection.", e);
-    }
-  }
-
-  @Override
-  public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
-    final int ordinal = input.readInt(true);
-    final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
-    final Object sourceCollection = kryo.readClassAndObject(input);
-    return unmodifiableCollection.create(sourceCollection);
-  }
-
-  @Override
-  public void write(final Kryo kryo, final Output output, final Object object) {
-    try {
-      final UnmodifiableCollection unmodifiableCollection =
-          UnmodifiableCollection.valueOfType(object.getClass());
-      // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
-      output.writeInt(unmodifiableCollection.ordinal(), true);
-      kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object));
-    } catch (final RuntimeException e) {
-      // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
-      // handles SerializationException specifically (resizing the buffer)...
-      throw e;
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public Object copy(Kryo kryo, Object original) {
-    try {
-      final UnmodifiableCollection unmodifiableCollection =
-          UnmodifiableCollection.valueOfType(original.getClass());
-      Object sourceCollectionCopy =
-          kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
-      return unmodifiableCollection.create(sourceCollectionCopy);
-    } catch (final RuntimeException e) {
-      // Don't eat and wrap RuntimeExceptions
-      throw e;
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private enum UnmodifiableCollection {
-    COLLECTION(
-        Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
-        SOURCE_COLLECTION_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableCollection((Collection<?>) sourceCollection);
-      }
-    },
-    RANDOM_ACCESS_LIST(
-        Collections.unmodifiableList(new ArrayList<Void>()).getClass(),
-        SOURCE_COLLECTION_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableList((List<?>) sourceCollection);
-      }
-    },
-    LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableList((List<?>) sourceCollection);
-      }
-    },
-    SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableSet((Set<?>) sourceCollection);
-      }
-    },
-    SORTED_SET(
-        Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(),
-        SOURCE_COLLECTION_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection);
-      }
-    },
-    MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableMap((Map<?, ?>) sourceCollection);
-      }
-
-    },
-    SORTED_MAP(
-        Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(),
-        SOURCE_MAP_FIELD) {
-      @Override
-      public Object create(final Object sourceCollection) {
-        return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection);
-      }
-    };
-
-    private final Class<?> type;
-    private final Field sourceCollectionField;
-
-    private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) {
-      this.type = type;
-      this.sourceCollectionField = sourceCollectionField;
-    }
-
-    /**
-     * @param sourceCollection
-     */
-    public abstract Object create(Object sourceCollection);
-
-    static UnmodifiableCollection valueOfType(final Class<?> type) {
-      for (final UnmodifiableCollection item : values()) {
-        if (item.type.equals(type)) {
-          return item;
-        }
-      }
-      throw new IllegalArgumentException("The type " + type + " is not supported.");
-    }
-
-  }
-
-  /**
-   * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
-   * for the several unmodifiable Collections that can be created via {@link Collections},
-   * including {@link Map}s.
-   *
-   * @see Collections#unmodifiableCollection(Collection)
-   * @see Collections#unmodifiableList(List)
-   * @see Collections#unmodifiableSet(Set)
-   * @see Collections#unmodifiableSortedSet(SortedSet)
-   * @see Collections#unmodifiableMap(Map)
-   * @see Collections#unmodifiableSortedMap(SortedMap)
-   */
-  public static void registerSerializers(Config config) {
-    UnmodifiableCollection.values();
-    for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
-      config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
index f64193e..53555c9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -39,6 +39,7 @@ class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounde
     TupleTag<?> outputTag = userGraphContext.getOutputTag();
     PValue outputValue = userGraphContext.getOutput();
     UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        userGraphContext.getStepName(),
         description,
         new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
         userGraphContext.getOptions(), outputTag);

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 6baa944..2148f34 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -277,7 +277,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
           pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
       if (pushedBackElements != null) {
         for (WindowedValue<InputT> elem : pushedBackElements.read()) {
-          LOG.info("Process pushback elem={}", elem);
+          LOG.debug("Process pushed back elem: {}", elem);
           runner.processElement(elem);
         }
         pushedBackElements.clear();

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
index 145b224..8812988 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.TupleTag;
 /**
  * An executor is a basic executable unit in a JStorm task.
  */
-interface Executor extends Serializable {
+public interface Executor extends Serializable {
   /**
    * Initialization during runtime.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index 33393f2..f8e09be 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -63,6 +64,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
 
   // map from input tag to executor inside bolt
   protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+  protected final Map<Executor, Collection<TupleTag>> executorToOutputTags = Maps.newHashMap();
+  protected final Map<Executor, String> executorToPTransformName = Maps.newHashMap();
   // set of all output tags that will be emit outside bolt
   protected final Set<TupleTag> outputTags = Sets.newHashSet();
   protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
@@ -84,16 +87,21 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
     isStatefulBolt = isStateful;
   }
 
-  public void addExecutor(TupleTag inputTag, Executor executor) {
+  public void addExecutor(TupleTag inputTag, Executor executor, String name) {
     inputTagToExecutor.put(
         checkNotNull(inputTag, "inputTag"),
         checkNotNull(executor, "executor"));
+    executorToPTransformName.put(executor, name);
   }
 
   public Map<TupleTag, Executor> getExecutors() {
     return inputTagToExecutor;
   }
 
+  public Map<Executor, String> getExecutorNames() {
+    return executorToPTransformName;
+  }
+
   public void registerExecutor(Executor executor) {
     if (executor instanceof DoFnExecutor) {
       DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
@@ -107,14 +115,31 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
     return idToDoFnExecutor;
   }
 
-  public void addOutputTags(TupleTag tag) {
-    outputTags.add(tag);
+  public void addOutputTags(Executor executor, TupleTag outputTag) {
+    Collection<TupleTag> outTags;
+    if (executorToOutputTags.containsKey(executor)) {
+      outTags = executorToOutputTags.get(executor);
+    } else {
+      outTags = Sets.newHashSet();
+      executorToOutputTags.put(executor, outTags);
+    }
+    outTags.add(outputTag);
+
+    outputTags.add(outputTag);
+  }
+
+  public Map<Executor, Collection<TupleTag>> getExecutorToOutputTags() {
+    return executorToOutputTags;
   }
 
   public void addExternalOutputTag(TupleTag<?> tag) {
     externalOutputTags.add(tag);
   }
 
+  public Collection<TupleTag> getExternalOutputTags() {
+    return externalOutputTags;
+  }
+
   public Set<TupleTag> getOutputTags() {
     return outputTags;
   }
@@ -328,6 +353,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
     for (Executor executor : inputTagToExecutor.values()) {
       ret.add(executor.toString());
     }
+    ret.add("outputTags");
+    for (TupleTag outputTag : outputTags) {
+      ret.add(outputTag.getId());
+    }
     ret.add("externalOutputTags");
     for (TupleTag output : externalOutputTags) {
       ret.add(output.getId());

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index b96bc56..ebe8bc3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -70,6 +70,7 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
       PValue output = userGraphContext.getOutput();
 
       UnboundedSourceSpout spout = new UnboundedSourceSpout(
+          userGraphContext.getStepName(),
           description,
           new EmptySource(),
           userGraphContext.getOptions(),

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 90ef6d2..292b771 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
@@ -328,8 +329,8 @@ class JStormStateInternals<K> implements StateInternals {
     public void add(T input) {
       try {
         int elemIndex = getElementIndex();
+        stateInfoKvState.put(getComposedKey(), elemIndex + 1);
         kvState.put(getComposedKey(elemIndex), input);
-        stateInfoKvState.put(getComposedKey(), ++elemIndex);
       } catch (IOException e) {
         throw new RuntimeException(e.getCause());
       }
@@ -381,23 +382,11 @@ class JStormStateInternals<K> implements StateInternals {
     }
 
     private ComposedKey getComposedKey() {
-      return ComposedKey.of(key, namespace);
+      return ComposedKey.of(id, key, namespace);
     }
 
     private ComposedKey getComposedKey(int elemIndex) {
-      return ComposedKey.of(key, namespace, elemIndex);
-    }
-
-    @Override
-    public String toString() {
-      int elemIndex = -1;
-      try {
-        elemIndex = getElementIndex();
-      } catch (IOException e) {
-
-      }
-      return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d",
-              id, key, namespace, elemIndex);
+      return ComposedKey.of(id, key, namespace, elemIndex);
     }
 
     @Override
@@ -475,11 +464,6 @@ class JStormStateInternals<K> implements StateInternals {
       public Iterator<T> iterator() {
         return new BagStateIterator();
       }
-
-      @Override
-      public String toString() {
-        return String.format("BagStateIterable: composedKey=%s", getComposedKey());
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 101921f..0991448 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -186,7 +186,7 @@ public class TranslationContext {
       executionGraphContext.registerStreamProducer(
           TaggedPValue.of(tag, value),
           Stream.Producer.of(name, tag.getId(), value.getName()));
-      //bolt.addOutputTags(tag);
+      bolt.addOutputTags(executor, tag);
     }
 
     // add the transform executor into the chain of ExecutorsBolt
@@ -196,7 +196,7 @@ public class TranslationContext {
       if (userGraphContext.findTupleTag(value) != null) {
         tag = userGraphContext.findTupleTag(value);
       }
-      bolt.addExecutor(tag, executor);
+      bolt.addExecutor(tag, executor, userGraphContext.getStepName());
 
       // filter all connections inside bolt
       //if (!bolt.getOutputTags().contains(tag)) {
@@ -212,7 +212,7 @@ public class TranslationContext {
 
     for (PValue sideInput : sideInputs) {
       TupleTag tag = userGraphContext.findTupleTag(sideInput);
-      bolt.addExecutor(tag, executor);
+      bolt.addExecutor(tag, executor, userGraphContext.getStepName());
       checkState(!bolt.getOutputTags().contains(tag));
       addStormStreamDef(
           TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
@@ -304,6 +304,15 @@ public class TranslationContext {
       return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
     }
 
+    public PValue findPValue(TupleTag tupleTag) {
+      for (Map.Entry<PValue, TupleTag> entry : pValueToTupleTag.entrySet()) {
+        if (entry.getValue().equals(tupleTag)) {
+            return entry.getKey();
+        }
+      }
+      return null;
+    }
+
     public void setWindowed() {
       this.isWindowed = true;
     }
@@ -361,6 +370,10 @@ public class TranslationContext {
       return this.spoutMap;
     }
 
+    public Map<String, ExecutorsBolt> getBolts() {
+      return this.boltMap;
+    }
+
     public String registerBolt(ExecutorsBolt bolt) {
       checkNotNull(bolt, "bolt");
       String name = "bolt" + genId();

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index dab9518..4ae28e6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -27,6 +27,7 @@ import com.alibaba.jstorm.utils.KryoSerializer;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.beam.runners.jstorm.JStormPipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
 public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
 
+  private final String name;
   private final String description;
   private final UnboundedSource source;
   private final SerializedPipelineOptions serializedOptions;
@@ -62,10 +64,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
   private long lastWaterMark = 0L;
 
   public UnboundedSourceSpout(
+      String name,
       String description,
       UnboundedSource source,
       JStormPipelineOptions options,
       TupleTag<?> outputTag) {
+    this.name = name;
     this.description = checkNotNull(description, "description");
     this.source = checkNotNull(source, "source");
     this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
@@ -174,6 +178,14 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
     }
   }
 
+  public String getName() {
+    return name;
+  }
+
+  public TupleTag getOutputTag() {
+    return outputTag;
+  }
+
   public UnboundedSource getUnboundedSource() {
     return source;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
index 54c9b94..7cf2469 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
@@ -37,6 +37,7 @@ class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbo
     PValue output = userGraphContext.getOutput();
 
     UnboundedSourceSpout spout = new UnboundedSourceSpout(
+        userGraphContext.getStepName(),
         description,
         transform.getSource(), userGraphContext.getOptions(), tag);
     context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));