You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:46 UTC
[46/53] [abbrv] beam git commit: jstorm-runner: 1. Generate execution
DAG for runtime 2. Restructure Kryo serializers
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
deleted file mode 100644
index 615ac8b..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/UnmodifiableCollectionsSerializer.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.serialization;
-
-import backtype.storm.Config;
-import com.alibaba.jstorm.esotericsoftware.kryo.Kryo;
-import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Input;
-import com.alibaba.jstorm.esotericsoftware.kryo.io.Output;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-/**
- * Specific serializer of {@link Kryo} for Unmodifiable Collection.
- */
-public class UnmodifiableCollectionsSerializer extends Serializer<Object> {
-
- private static final Field SOURCE_COLLECTION_FIELD;
- private static final Field SOURCE_MAP_FIELD;
-
- static {
- try {
- SOURCE_COLLECTION_FIELD = Class.forName("java.util.Collections$UnmodifiableCollection")
- .getDeclaredField("c");
- SOURCE_COLLECTION_FIELD.setAccessible(true);
-
-
- SOURCE_MAP_FIELD = Class.forName("java.util.Collections$UnmodifiableMap")
- .getDeclaredField("m");
- SOURCE_MAP_FIELD.setAccessible(true);
- } catch (final Exception e) {
- throw new RuntimeException("Could not access source collection"
- + " field in java.util.Collections$UnmodifiableCollection.", e);
- }
- }
-
- @Override
- public Object read(final Kryo kryo, final Input input, final Class<Object> clazz) {
- final int ordinal = input.readInt(true);
- final UnmodifiableCollection unmodifiableCollection = UnmodifiableCollection.values()[ordinal];
- final Object sourceCollection = kryo.readClassAndObject(input);
- return unmodifiableCollection.create(sourceCollection);
- }
-
- @Override
- public void write(final Kryo kryo, final Output output, final Object object) {
- try {
- final UnmodifiableCollection unmodifiableCollection =
- UnmodifiableCollection.valueOfType(object.getClass());
- // the ordinal could be replaced by s.th. else (e.g. a explicitely managed "id")
- output.writeInt(unmodifiableCollection.ordinal(), true);
- kryo.writeClassAndObject(output, unmodifiableCollection.sourceCollectionField.get(object));
- } catch (final RuntimeException e) {
- // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write...
- // handles SerializationException specifically (resizing the buffer)...
- throw e;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Object copy(Kryo kryo, Object original) {
- try {
- final UnmodifiableCollection unmodifiableCollection =
- UnmodifiableCollection.valueOfType(original.getClass());
- Object sourceCollectionCopy =
- kryo.copy(unmodifiableCollection.sourceCollectionField.get(original));
- return unmodifiableCollection.create(sourceCollectionCopy);
- } catch (final RuntimeException e) {
- // Don't eat and wrap RuntimeExceptions
- throw e;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private enum UnmodifiableCollection {
- COLLECTION(
- Collections.unmodifiableCollection(Arrays.asList("")).getClass(),
- SOURCE_COLLECTION_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableCollection((Collection<?>) sourceCollection);
- }
- },
- RANDOM_ACCESS_LIST(
- Collections.unmodifiableList(new ArrayList<Void>()).getClass(),
- SOURCE_COLLECTION_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableList((List<?>) sourceCollection);
- }
- },
- LIST(Collections.unmodifiableList(new LinkedList<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableList((List<?>) sourceCollection);
- }
- },
- SET(Collections.unmodifiableSet(new HashSet<Void>()).getClass(), SOURCE_COLLECTION_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableSet((Set<?>) sourceCollection);
- }
- },
- SORTED_SET(
- Collections.unmodifiableSortedSet(new TreeSet<Void>()).getClass(),
- SOURCE_COLLECTION_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableSortedSet((SortedSet<?>) sourceCollection);
- }
- },
- MAP(Collections.unmodifiableMap(new HashMap<Void, Void>()).getClass(), SOURCE_MAP_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableMap((Map<?, ?>) sourceCollection);
- }
-
- },
- SORTED_MAP(
- Collections.unmodifiableSortedMap(new TreeMap<Void, Void>()).getClass(),
- SOURCE_MAP_FIELD) {
- @Override
- public Object create(final Object sourceCollection) {
- return Collections.unmodifiableSortedMap((SortedMap<?, ?>) sourceCollection);
- }
- };
-
- private final Class<?> type;
- private final Field sourceCollectionField;
-
- private UnmodifiableCollection(final Class<?> type, final Field sourceCollectionField) {
- this.type = type;
- this.sourceCollectionField = sourceCollectionField;
- }
-
- /**
- * @param sourceCollection
- */
- public abstract Object create(Object sourceCollection);
-
- static UnmodifiableCollection valueOfType(final Class<?> type) {
- for (final UnmodifiableCollection item : values()) {
- if (item.type.equals(type)) {
- return item;
- }
- }
- throw new IllegalArgumentException("The type " + type + " is not supported.");
- }
-
- }
-
- /**
- * Creates a new {@link UnmodifiableCollectionsSerializer} and registers its serializer
- * for the several unmodifiable Collections that can be created via {@link Collections},
- * including {@link Map}s.
- *
- * @see Collections#unmodifiableCollection(Collection)
- * @see Collections#unmodifiableList(List)
- * @see Collections#unmodifiableSet(Set)
- * @see Collections#unmodifiableSortedSet(SortedSet)
- * @see Collections#unmodifiableMap(Map)
- * @see Collections#unmodifiableSortedMap(SortedMap)
- */
- public static void registerSerializers(Config config) {
- UnmodifiableCollection.values();
- for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
- config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
index f64193e..53555c9 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -39,6 +39,7 @@ class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounde
TupleTag<?> outputTag = userGraphContext.getOutputTag();
PValue outputValue = userGraphContext.getOutput();
UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ userGraphContext.getStepName(),
description,
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter(transform.getSource()),
userGraphContext.getOptions(), outputTag);
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 6baa944..2148f34 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -277,7 +277,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
if (pushedBackElements != null) {
for (WindowedValue<InputT> elem : pushedBackElements.read()) {
- LOG.info("Process pushback elem={}", elem);
+ LOG.debug("Process pushed back elem: {}", elem);
runner.processElement(elem);
}
pushedBackElements.clear();
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
index 145b224..8812988 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.values.TupleTag;
/**
* An executor is a basic executable unit in a JStorm task.
*/
-interface Executor extends Serializable {
+public interface Executor extends Serializable {
/**
* Initialization during runtime.
*/
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index 33393f2..f8e09be 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -63,6 +64,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
// map from input tag to executor inside bolt
protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
+ protected final Map<Executor, Collection<TupleTag>> executorToOutputTags = Maps.newHashMap();
+ protected final Map<Executor, String> executorToPTransformName = Maps.newHashMap();
// set of all output tags that will be emit outside bolt
protected final Set<TupleTag> outputTags = Sets.newHashSet();
protected final Set<TupleTag> externalOutputTags = Sets.newHashSet();
@@ -84,16 +87,21 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
isStatefulBolt = isStateful;
}
- public void addExecutor(TupleTag inputTag, Executor executor) {
+ public void addExecutor(TupleTag inputTag, Executor executor, String name) {
inputTagToExecutor.put(
checkNotNull(inputTag, "inputTag"),
checkNotNull(executor, "executor"));
+ executorToPTransformName.put(executor, name);
}
public Map<TupleTag, Executor> getExecutors() {
return inputTagToExecutor;
}
+ public Map<Executor, String> getExecutorNames() {
+ return executorToPTransformName;
+ }
+
public void registerExecutor(Executor executor) {
if (executor instanceof DoFnExecutor) {
DoFnExecutor doFnExecutor = (DoFnExecutor) executor;
@@ -107,14 +115,31 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
return idToDoFnExecutor;
}
- public void addOutputTags(TupleTag tag) {
- outputTags.add(tag);
+ public void addOutputTags(Executor executor, TupleTag outputTag) {
+ Collection<TupleTag> outTags;
+ if (executorToOutputTags.containsKey(executor)) {
+ outTags = executorToOutputTags.get(executor);
+ } else {
+ outTags = Sets.newHashSet();
+ executorToOutputTags.put(executor, outTags);
+ }
+ outTags.add(outputTag);
+
+ outputTags.add(outputTag);
+ }
+
+ public Map<Executor, Collection<TupleTag>> getExecutorToOutputTags() {
+ return executorToOutputTags;
}
public void addExternalOutputTag(TupleTag<?> tag) {
externalOutputTags.add(tag);
}
+ public Collection<TupleTag> getExternalOutputTags() {
+ return externalOutputTags;
+ }
+
public Set<TupleTag> getOutputTags() {
return outputTags;
}
@@ -328,6 +353,10 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
for (Executor executor : inputTagToExecutor.values()) {
ret.add(executor.toString());
}
+ ret.add("outputTags");
+ for (TupleTag outputTag : outputTags) {
+ ret.add(outputTag.getId());
+ }
ret.add("externalOutputTags");
for (TupleTag output : externalOutputTags) {
ret.add(output.getId());
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index b96bc56..ebe8bc3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -70,6 +70,7 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
PValue output = userGraphContext.getOutput();
UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ userGraphContext.getStepName(),
description,
new EmptySource(),
userGraphContext.getOptions(),
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 90ef6d2..292b771 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
@@ -328,8 +329,8 @@ class JStormStateInternals<K> implements StateInternals {
public void add(T input) {
try {
int elemIndex = getElementIndex();
+ stateInfoKvState.put(getComposedKey(), elemIndex + 1);
kvState.put(getComposedKey(elemIndex), input);
- stateInfoKvState.put(getComposedKey(), ++elemIndex);
} catch (IOException e) {
throw new RuntimeException(e.getCause());
}
@@ -381,23 +382,11 @@ class JStormStateInternals<K> implements StateInternals {
}
private ComposedKey getComposedKey() {
- return ComposedKey.of(key, namespace);
+ return ComposedKey.of(id, key, namespace);
}
private ComposedKey getComposedKey(int elemIndex) {
- return ComposedKey.of(key, namespace, elemIndex);
- }
-
- @Override
- public String toString() {
- int elemIndex = -1;
- try {
- elemIndex = getElementIndex();
- } catch (IOException e) {
-
- }
- return String.format("stateId=%s, key=%s, namespace=%s, elementIndex=%d",
- id, key, namespace, elemIndex);
+ return ComposedKey.of(id, key, namespace, elemIndex);
}
@Override
@@ -475,11 +464,6 @@ class JStormStateInternals<K> implements StateInternals {
public Iterator<T> iterator() {
return new BagStateIterator();
}
-
- @Override
- public String toString() {
- return String.format("BagStateIterable: composedKey=%s", getComposedKey());
- }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 101921f..0991448 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -186,7 +186,7 @@ public class TranslationContext {
executionGraphContext.registerStreamProducer(
TaggedPValue.of(tag, value),
Stream.Producer.of(name, tag.getId(), value.getName()));
- //bolt.addOutputTags(tag);
+ bolt.addOutputTags(executor, tag);
}
// add the transform executor into the chain of ExecutorsBolt
@@ -196,7 +196,7 @@ public class TranslationContext {
if (userGraphContext.findTupleTag(value) != null) {
tag = userGraphContext.findTupleTag(value);
}
- bolt.addExecutor(tag, executor);
+ bolt.addExecutor(tag, executor, userGraphContext.getStepName());
// filter all connections inside bolt
//if (!bolt.getOutputTags().contains(tag)) {
@@ -212,7 +212,7 @@ public class TranslationContext {
for (PValue sideInput : sideInputs) {
TupleTag tag = userGraphContext.findTupleTag(sideInput);
- bolt.addExecutor(tag, executor);
+ bolt.addExecutor(tag, executor, userGraphContext.getStepName());
checkState(!bolt.getOutputTags().contains(tag));
addStormStreamDef(
TaggedPValue.of(tag, sideInput), name, Stream.Grouping.of(Stream.Grouping.Type.ALL));
@@ -304,6 +304,15 @@ public class TranslationContext {
return pValueToTupleTag.get(checkNotNull(pValue, "pValue"));
}
+ public PValue findPValue(TupleTag tupleTag) {
+ for (Map.Entry<PValue, TupleTag> entry : pValueToTupleTag.entrySet()) {
+ if (entry.getValue().equals(tupleTag)) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
public void setWindowed() {
this.isWindowed = true;
}
@@ -361,6 +370,10 @@ public class TranslationContext {
return this.spoutMap;
}
+ public Map<String, ExecutorsBolt> getBolts() {
+ return this.boltMap;
+ }
+
public String registerBolt(ExecutorsBolt bolt) {
checkNotNull(bolt, "bolt");
String name = "bolt" + genId();
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index dab9518..4ae28e6 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -27,6 +27,7 @@ import com.alibaba.jstorm.utils.KryoSerializer;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -45,6 +46,7 @@ import org.slf4j.LoggerFactory;
public class UnboundedSourceSpout extends AbstractComponent implements IRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceSpout.class);
+ private final String name;
private final String description;
private final UnboundedSource source;
private final SerializedPipelineOptions serializedOptions;
@@ -62,10 +64,12 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
private long lastWaterMark = 0L;
public UnboundedSourceSpout(
+ String name,
String description,
UnboundedSource source,
JStormPipelineOptions options,
TupleTag<?> outputTag) {
+ this.name = name;
this.description = checkNotNull(description, "description");
this.source = checkNotNull(source, "source");
this.serializedOptions = new SerializedPipelineOptions(checkNotNull(options, "options"));
@@ -174,6 +178,14 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
}
}
+ public String getName() {
+ return name;
+ }
+
+ public TupleTag getOutputTag() {
+ return outputTag;
+ }
+
public UnboundedSource getUnboundedSource() {
return source;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6078cbc6/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
index 54c9b94..7cf2469 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceTranslator.java
@@ -37,6 +37,7 @@ class UnboundedSourceTranslator<T> extends TransformTranslator.Default<Read.Unbo
PValue output = userGraphContext.getOutput();
UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ userGraphContext.getStepName(),
description,
transform.getSource(), userGraphContext.getOptions(), tag);
context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));