You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:41 UTC
[41/53] [abbrv] beam git commit: jstorm-runner: Fixup for review
comments
jstorm-runner: Fixup for review comments
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90ed2ef3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90ed2ef3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90ed2ef3
Branch: refs/heads/jstorm-runner
Commit: 90ed2ef344d19ca730429e9eb7c71779f995fc47
Parents: 6078cbc
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Aug 14 16:20:03 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:03:00 2017 +0800
----------------------------------------------------------------------
.../runners/jstorm/JStormPipelineOptions.java | 12 +--
.../beam/runners/jstorm/JStormRunner.java | 21 ++--
.../beam/runners/jstorm/JStormRunnerResult.java | 21 ++--
.../beam/runners/jstorm/TestJStormRunner.java | 19 +++-
.../serialization/JavaUtilsSerializer.java | 3 +-
.../translation/BoundedSourceTranslator.java | 4 +-
.../jstorm/translation/DoFnExecutor.java | 27 +++--
.../runners/jstorm/translation/Executor.java | 6 ++
.../jstorm/translation/ExecutorsBolt.java | 8 +-
.../jstorm/translation/FlattenExecutor.java | 1 -
.../jstorm/translation/FlattenTranslator.java | 23 ++--
.../translation/GroupByKeyTranslator.java | 12 ---
.../translation/GroupByWindowExecutor.java | 12 ---
.../translation/JStormStateInternals.java | 29 +++--
.../jstorm/translation/MetricsReporter.java | 2 -
.../translation/MultiOutputDoFnExecutor.java | 22 +---
.../translation/MultiStatefulDoFnExecutor.java | 5 +-
.../translation/ParDoBoundMultiTranslator.java | 14 +--
.../translation/ParDoBoundTranslator.java | 108 -------------------
.../translation/StatefulDoFnExecutor.java | 1 -
.../jstorm/translation/TimerService.java | 2 +-
.../jstorm/translation/TimerServiceImpl.java | 8 +-
.../jstorm/translation/TransformTranslator.java | 16 ++-
.../jstorm/translation/TranslationContext.java | 18 +++-
.../jstorm/translation/TranslatorRegistry.java | 1 -
.../translation/UnboundedSourceSpout.java | 8 +-
.../jstorm/translation/ViewTranslator.java | 4 +-
.../translation/WindowAssignExecutor.java | 2 -
.../translation/JStormStateInternalsTest.java | 2 +-
29 files changed, 141 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
index 114877a..e494757 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormPipelineOptions.java
@@ -36,8 +36,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
@Description("Executing time(sec) of topology on local mode. Default is 1min.")
@Default.Long(60)
- Long getLocalModeExecuteTime();
- void setLocalModeExecuteTime(Long time);
+ Long getLocalModeExecuteTimeSec();
+ void setLocalModeExecuteTimeSec(Long time);
@Description("Worker number of topology")
@Default.Integer(1)
@@ -46,8 +46,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
@Description("Global parallelism number of a component")
@Default.Integer(1)
- Integer getParallelismNumber();
- void setParallelismNumber(Integer number);
+ Integer getParallelism();
+ void setParallelism(Integer number);
@Description("System topology config of JStorm")
@Default.InstanceFactory(DefaultMapValueFactory.class)
@@ -61,8 +61,8 @@ public interface JStormPipelineOptions extends PipelineOptions {
@Description("Parallelism number of a specified composite PTransform")
@Default.InstanceFactory(DefaultMapValueFactory.class)
- Map getParallelismNumMap();
- void setParallelismNumMap(Map parallelismNumMap);
+ Map getParallelismMap();
+ void setParallelismMap(Map parallelismNumMap);
/**
* Default value factory for topology configuration of JStorm.
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
index 47de42c..21a8fae 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunner.java
@@ -79,15 +79,15 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
}
public static JStormRunner fromOptions(PipelineOptions options) {
- JStormPipelineOptions pipelineOptions = PipelineOptionsValidator.validate(
- JStormPipelineOptions.class, options);
+ JStormPipelineOptions pipelineOptions =
+ PipelineOptionsValidator.validate(JStormPipelineOptions.class, options);
return new JStormRunner(pipelineOptions);
}
/**
- * convert pipeline options to storm configuration format.
+ * Convert pipeline options to JStorm configuration format.
* @param options
- * @return
+ * @return JStorm configuration
*/
private Config convertPipelineOptionsToConfig(JStormPipelineOptions options) {
Config config = new Config();
@@ -103,6 +103,8 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
// Setup config for runtime env
config.put("worker.external", "beam");
+ // We use "com.alibaba.jstorm.transactional" API for "at least once" and "exactly once",
+ // so we don't need acker task for beam job any more, and set related number to 0.
config.put("topology.acker.executors", 0);
// Register serializers of Kryo
@@ -271,7 +273,7 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
LocalCluster localCluster = LocalCluster.getInstance();
localCluster.submitTopology(topologyName, config, topology);
return JStormRunnerResult.local(
- topologyName, config, localCluster, options.getLocalModeExecuteTime());
+ topologyName, config, localCluster, options.getLocalModeExecuteTimeSec());
} else {
StormSubmitter.submitTopology(topologyName, config, topology);
return null;
@@ -298,11 +300,12 @@ public class JStormRunner extends PipelineRunner<JStormRunnerResult> {
TopologyBuilder builder =
isExactlyOnce ? new TransactionTopologyBuilder() : new TopologyBuilder();
- int parallelismNumber = options.getParallelismNumber();
+ int parallelismNumber = options.getParallelism();
Map<String, UnboundedSourceSpout> spouts = context.getSpouts();
- for (String id : spouts.keySet()) {
- IRichSpout spout = getSpout(isExactlyOnce, spouts.get(id));
- builder.setSpout(id, spout, getParallelismNum(spouts.get(id), parallelismNumber));
+ for (Map.Entry<String, UnboundedSourceSpout> entry : spouts.entrySet()) {
+ IRichSpout spout = getSpout(isExactlyOnce, entry.getValue());
+ builder.setSpout(
+ entry.getKey(), spout, getParallelismNum(entry.getValue(), parallelismNumber));
}
HashMap<String, BoltDeclarer> declarers = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
index 797c899..4b1850e 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java
@@ -38,7 +38,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
Config config,
LocalCluster localCluster,
long localModeExecuteTimeSecs) {
- return new LocalStormPipelineResult(
+ return new LocalJStormPipelineResult(
topologyName, config, localCluster, localModeExecuteTimeSecs);
}
@@ -62,12 +62,12 @@ public abstract class JStormRunnerResult implements PipelineResult {
return topologyName;
}
- private static class LocalStormPipelineResult extends JStormRunnerResult {
+ private static class LocalJStormPipelineResult extends JStormRunnerResult {
private LocalCluster localCluster;
private long localModeExecuteTimeSecs;
- LocalStormPipelineResult(
+ LocalJStormPipelineResult(
String topologyName,
Config config,
LocalCluster localCluster,
@@ -78,7 +78,6 @@ public abstract class JStormRunnerResult implements PipelineResult {
@Override
public State cancel() throws IOException {
- //localCluster.deactivate(getTopologyName());
localCluster.killTopology(getTopologyName());
localCluster.shutdown();
JStormUtils.sleepMs(1000);
@@ -87,12 +86,7 @@ public abstract class JStormRunnerResult implements PipelineResult {
@Override
public State waitUntilFinish(Duration duration) {
- return waitUntilFinish();
- }
-
- @Override
- public State waitUntilFinish() {
- JStormUtils.sleepMs(localModeExecuteTimeSecs * 1000);
+ JStormUtils.sleepMs(duration.getMillis());
try {
return cancel();
} catch (IOException e) {
@@ -101,8 +95,13 @@ public abstract class JStormRunnerResult implements PipelineResult {
}
@Override
+ public State waitUntilFinish() {
+ return waitUntilFinish(Duration.standardSeconds(localModeExecuteTimeSecs));
+ }
+
+ @Override
public MetricResults metrics() {
- return null;
+ throw new UnsupportedOperationException("This method is not yet supported.");
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
index 21a58e3..c9990e4 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java
@@ -50,13 +50,21 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
return new TestJStormRunner(options.as(JStormPipelineOptions.class));
}
+ // waiting time when job with assertion
+ private static final int ASSERTION_WAITING_TIME_MS = 20 * 1000;
+ // waiting time when job without assertion
+ private static final int RESULT_WAITING_TIME_MS = 5 * 1000;
+ private static final int RESULT_CHECK_INTERVAL_MS = 500;
+
private final JStormRunner stormRunner;
private final JStormPipelineOptions options;
private TestJStormRunner(JStormPipelineOptions options) {
this.options = options;
Map conf = Maps.newHashMap();
- //conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
+ // Default state backend is RocksDB, for the users who could not run RocksDB on local testing
+ // env, following config is used to configure state backend to memory.
+ // conf.put(ConfigExtension.KV_STORE_TYPE, KvStoreManagerFactory.KvStoreType.memory.toString());
options.setTopologyConfig(conf);
options.setLocalMode(true);
stormRunner = JStormRunner.fromOptions(checkNotNull(options, "options"));
@@ -73,8 +81,9 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
LOG.info("Running JStorm job {} with {} expected assertions.",
result.getTopologyName(), numberOfAssertions);
- int maxTimeoutSec = numberOfAssertions > 0 ? 20 : 5;
- for (int waitTime = 0; waitTime <= maxTimeoutSec * 1000; ) {
+ int maxTimeoutMs =
+ numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : RESULT_WAITING_TIME_MS;
+ for (int waitTime = 0; waitTime <= maxTimeoutMs; ) {
Optional<Boolean> success = numberOfAssertions > 0
? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent();
Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord();
@@ -86,8 +95,8 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> {
LOG.info("Exception was found.", taskExceptionRec);
throw new RuntimeException(taskExceptionRec.getCause());
} else {
- JStormUtils.sleepMs(500);
- waitTime += 500;
+ JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS);
+ waitTime += RESULT_CHECK_INTERVAL_MS;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
index 5df686c..fa46fdb 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/JavaUtilsSerializer.java
@@ -45,7 +45,7 @@ import java.util.TreeSet;
public class JavaUtilsSerializer {
/**
- * Specific {@link Kryo} serializer for {@link java.util.Collections.SingletonList}.
+ * Specific {@link Kryo} serializer for {@code java.util.Collections.SingletonList}.
*/
public static class CollectionsSingletonListSerializer extends Serializer<List<?>> {
public CollectionsSingletonListSerializer() {
@@ -222,7 +222,6 @@ public class JavaUtilsSerializer {
* @see Collections#unmodifiableSortedMap(SortedMap)
*/
private static void registerUnmodifableCollectionSerializers(Config config) {
- UnmodifiableCollection.values();
for (final UnmodifiableCollection item : UnmodifiableCollection.values()) {
config.registerSerialization(item.type, UnmodifiableCollectionsSerializer.class);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
index 53555c9..77d0823 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/BoundedSourceTranslator.java
@@ -24,9 +24,7 @@ import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
/**
- * Translates a {@link Read.Bounded} into a Storm spout.
- *
- * @param <T>
+ * Translates a {@link Read.Bounded} into a JStorm spout.
*/
class BoundedSourceTranslator<T> extends TransformTranslator.Default<Read.Bounded<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
index 2148f34..72c386a 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java
@@ -71,9 +71,15 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
/**
* Implements {@link OutputManager} in a DoFn executor.
*/
- public class DoFnExecutorOutputManager implements OutputManager, Serializable {
+ protected static class DoFnExecutorOutputManager implements OutputManager, Serializable {
private static final long serialVersionUID = -661113364735206170L;
+ private ExecutorsBolt executorsBolt;
+
+ public DoFnExecutorOutputManager(ExecutorsBolt executorsBolt) {
+ this.executorsBolt = executorsBolt;
+ }
+
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
executorsBolt.processExecutorElem(tag, output);
@@ -97,23 +103,23 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
protected DoFn<InputT, OutputT> doFn;
protected final Coder<WindowedValue<InputT>> inputCoder;
- protected DoFnInvoker<InputT, OutputT> doFnInvoker;
- protected OutputManager outputManager;
+ protected transient DoFnInvoker<InputT, OutputT> doFnInvoker;
+ protected transient OutputManager outputManager;
protected WindowingStrategy<?, ?> windowingStrategy;
protected final TupleTag<InputT> mainInputTag;
protected Collection<PCollectionView<?>> sideInputs;
- protected SideInputHandler sideInputHandler;
+ protected transient SideInputHandler sideInputHandler;
protected final Map<TupleTag, PCollectionView<?>> sideInputTagToView;
// Initialize during runtime
- protected ExecutorContext executorContext;
+ protected transient ExecutorContext executorContext;
protected ExecutorsBolt executorsBolt;
- protected TimerInternals timerInternals;
+ protected transient TimerInternals timerInternals;
protected transient StateInternals pushbackStateInternals;
protected transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
protected transient StateTag<WatermarkHoldState> watermarkHoldTag;
protected transient IKvStoreManager kvStoreManager;
- protected DefaultStepContext stepContext;
+ protected transient DefaultStepContext stepContext;
protected transient MetricClient metricClient;
public DoFnExecutor(
@@ -133,7 +139,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
this.doFn = doFn;
this.inputCoder = inputCoder;
- this.outputManager = new DoFnExecutorOutputManager();
this.windowingStrategy = windowingStrategy;
this.mainInputTag = mainInputTag;
this.sideInputs = sideInputs;
@@ -174,6 +179,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
this.executorsBolt = context.getExecutorsBolt();
this.pipelineOptions =
this.serializedOptions.getPipelineOptions().as(JStormPipelineOptions.class);
+ this.outputManager = new DoFnExecutorOutputManager(executorsBolt);
initService(context);
@@ -199,8 +205,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
@Override
public <T> void process(TupleTag<T> tag, WindowedValue<T> elem) {
- LOG.debug(String.format("process: elemTag=%s, mainInputTag=%s, sideInputs=%s, elem={}",
- tag, mainInputTag, sideInputs, elem.getValue()));
if (mainInputTag.equals(tag)) {
processMainInput(elem);
} else if (sideInputTagToView.containsKey(tag)) {
@@ -213,6 +217,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
}
protected <T> void processMainInput(WindowedValue<T> elem) {
+ LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem));
if (sideInputs.isEmpty()) {
runner.processElement((WindowedValue<InputT>) elem);
} else {
@@ -234,7 +239,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor {
}
protected void processSideInput(TupleTag tag, WindowedValue elem) {
- LOG.debug(String.format("side inputs: %s, %s.", tag, elem));
+ LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem));
PCollectionView<?> sideInputView = sideInputTagToView.get(tag);
sideInputHandler.addSideInputValue(sideInputView, elem);
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
index 8812988..fd7af7d 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/Executor.java
@@ -30,7 +30,13 @@ public interface Executor extends Serializable {
*/
void init(ExecutorContext context);
+ /**
+ * Process element form "tag" stream.
+ */
<T> void process(TupleTag<T> tag, WindowedValue<T> elem);
+ /**
+ * Cleanup when task is shutdown.
+ */
void cleanup();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
index f8e09be..449ecb5 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java
@@ -58,9 +58,9 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorsBolt.class);
- protected ExecutorContext executorContext;
+ protected transient ExecutorContext executorContext;
- protected TimerService timerService;
+ protected transient TimerService timerService;
// map from input tag to executor inside bolt
protected final Map<TupleTag, Executor> inputTagToExecutor = Maps.newHashMap();
@@ -73,7 +73,7 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
protected int internalDoFnExecutorId = 1;
protected final Map<Integer, DoFnExecutor> idToDoFnExecutor = Maps.newHashMap();
- protected OutputCollector collector;
+ protected transient OutputCollector collector;
protected boolean isStatefulBolt = false;
@@ -265,8 +265,8 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt {
}
public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) {
- LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
if (elem != null) {
+ LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag);
Executor executor = inputTagToExecutor.get(inputTag);
if (executor != null) {
executor.process(inputTag, elem);
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
index 928fa24..9d4184c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenExecutor.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.values.TupleTag;
/**
* JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.Flatten}.
- * @param <InputT>
*/
class FlattenExecutor<InputT> implements Executor {
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index ebe8bc3..62621d0 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -46,20 +45,18 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
public void translateNode(Flatten.PCollections<V> transform, TranslationContext context) {
TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- // Since a new tag is created in PCollectionList, retrieve the real tag here.
+ // Flatten supports to consume multi-copy from a same PCollection, so we need to record
+ // the copy number here.
Map<TupleTag<?>, PValue> inputs = Maps.newHashMap();
Map<TupleTag<?>, Integer> tagToCopyNum = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getInputs().entrySet()) {
- PCollection<V> pc = (PCollection<V>) entry.getValue();
- //inputs.putAll(pc.expand());
- for (Map.Entry<TupleTag<?>, PValue> entry1 : pc.expand().entrySet()) {
- if (inputs.containsKey(entry1.getKey())) {
- int copyNum = tagToCopyNum.get(entry1.getKey());
- tagToCopyNum.put(entry1.getKey(), ++copyNum);
- } else {
- inputs.put(entry1.getKey(), entry1.getValue());
- tagToCopyNum.put(entry1.getKey(), 1);
- }
+ for (Map.Entry<TupleTag<?>, PValue> entry : userGraphContext.getTransformInputs().entrySet()) {
+ TupleTag tag = userGraphContext.findTupleTag(entry.getValue());
+ if (inputs.containsKey(tag)) {
+ int copyNum = tagToCopyNum.get(tag);
+ tagToCopyNum.put(tag, ++copyNum);
+ } else {
+ inputs.put(tag, entry.getValue());
+ tagToCopyNum.put(tag, 1);
}
}
String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
index 85c958a..02f42bd 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByKeyTranslator.java
@@ -18,28 +18,21 @@
package org.apache.beam.runners.jstorm.translation;
import com.google.common.collect.Lists;
-import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
/**
* Translates a {@link GroupByKey} to a JStorm {@link GroupByWindowExecutor}.
- * @param <K>
- * @param <V>
*/
class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<K, V>> {
// information of transform
protected PCollection<KV<K, V>> input;
- protected PCollection<KV<K, Iterable<V>>> output;
- protected List<TupleTag<?>> inputTags;
protected TupleTag<KV<K, Iterable<V>>> mainOutputTag;
protected List<TupleTag<?>> sideOutputTags;
- protected List<PCollectionView<?>> sideInputs;
protected WindowingStrategy<?, ?> windowingStrategy;
@Override
@@ -49,13 +42,8 @@ class GroupByKeyTranslator<K, V> extends TransformTranslator.Default<GroupByKey<
describeTransform(transform, userGraphContext.getInputs(), userGraphContext.getOutputs());
input = (PCollection<KV<K, V>>) userGraphContext.getInput();
- output = (PCollection<KV<K, Iterable<V>>>) userGraphContext.getOutput();
-
- inputTags = userGraphContext.getInputTags();
mainOutputTag = (TupleTag<KV<K, Iterable<V>>>) userGraphContext.getOutputTag();
sideOutputTags = Lists.newArrayList();
-
- sideInputs = Collections.<PCollectionView<?>>emptyList();
windowingStrategy = input.getWindowingStrategy();
GroupByWindowExecutor<K, V> groupByWindowExecutor = new GroupByWindowExecutor<>(
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
index 1c858b7..cae1bc3 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/GroupByWindowExecutor.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
@@ -51,8 +50,6 @@ import org.slf4j.LoggerFactory;
/**
* JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.GroupByKey}.
- * @param <K>
- * @param <V>
*/
class GroupByWindowExecutor<K, V>
extends DoFnExecutor<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> {
@@ -60,14 +57,6 @@ class GroupByWindowExecutor<K, V>
private static final Logger LOG = LoggerFactory.getLogger(GroupByWindowExecutor.class);
- private class GroupByWindowOutputManager implements DoFnRunners.OutputManager, Serializable {
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
private KvCoder<K, V> inputKvCoder;
private SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
@@ -92,7 +81,6 @@ class GroupByWindowExecutor<K, V>
mainTupleTag,
sideOutputTags);
- this.outputManager = new GroupByWindowOutputManager();
UserGraphContext userGraphContext = context.getUserGraphContext();
PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) userGraphContext.getInput();
this.inputKvCoder = (KvCoder<K, V>) input.getCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
index 292b771..e2139d8 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormStateInternals.java
@@ -180,12 +180,8 @@ class JStormStateInternals<K> implements StateInternals {
kvStoreManager.<ComposedKey, Object>getOrCreate(STATE_INFO + getStoreId(id)));
Combine.CombineFn<Instant, Combine.Holder<Instant>, Instant> outputTimeCombineFn =
- new BinaryCombineFn<Instant>() {
- @Override
- public Instant apply(Instant left, Instant right) {
- return timestampCombiner.combine(left, right);
- }
- };
+ new WatermarkCombineFn(timestampCombiner);
+
return new JStormWatermarkHoldState(
id, spec, namespace,
new JStormCombiningState<>(
@@ -203,6 +199,19 @@ class JStormStateInternals<K> implements StateInternals {
});
}
+ private static class WatermarkCombineFn extends BinaryCombineFn<Instant> {
+ private final TimestampCombiner timestampCombiner;
+
+ public WatermarkCombineFn(TimestampCombiner timestampCombiner) {
+ this.timestampCombiner = timestampCombiner;
+ }
+
+ @Override
+ public Instant apply(Instant left, Instant right) {
+ return timestampCombiner.combine(left, right);
+ }
+ };
+
/**
* JStorm implementation of {@link ValueState}.
*/
@@ -623,7 +632,7 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public ReadableState<V> get(K var1) {
- ReadableState<V> ret = new MapReadableState<>(null);
+ ReadableState<V> ret = null;
try {
ret = new MapReadableState(kvStore.get(var1));
} catch (IOException e) {
@@ -634,7 +643,7 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public ReadableState<Iterable<K>> keys() {
- ReadableState<Iterable<K>> ret = new MapReadableState<>(null);
+ ReadableState<Iterable<K>> ret = null;
try {
ret = new MapReadableState<>(kvStore.keys());
} catch (IOException e) {
@@ -645,7 +654,7 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public ReadableState<Iterable<V>> values() {
- ReadableState<Iterable<V>> ret = new MapReadableState<>(null);
+ ReadableState<Iterable<V>> ret = null;
try {
ret = new MapReadableState<>(kvStore.values());
} catch (IOException e) {
@@ -656,7 +665,7 @@ class JStormStateInternals<K> implements StateInternals {
@Override
public ReadableState<Iterable<Map.Entry<K, V>>> entries() {
- ReadableState<Iterable<Map.Entry<K, V>>> ret = new MapReadableState<>(null);
+ ReadableState<Iterable<Map.Entry<K, V>>> ret = null;
try {
ret = new MapReadableState<>(kvStore.entries());
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
index 5b60b03..e7f3285 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MetricsReporter.java
@@ -63,10 +63,8 @@ class MetricsReporter {
}
private void updateCounters(Iterable<MetricResult<Long>> counters) {
- System.out.print("updateCounters");
for (MetricResult<Long> metricResult : counters) {
String metricName = getMetricNameString(COUNTER_PREFIX, metricResult);
- System.out.print("metricName: " + metricName);
Long updateValue = metricResult.attempted();
Long oldValue = reportedCounters.get(metricName);
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
index 138a5dc..f318a89 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiOutputDoFnExecutor.java
@@ -32,26 +32,10 @@ import org.slf4j.LoggerFactory;
/**
* JStorm {@link Executor} for {@link DoFn} with multi-output.
- * @param <InputT>
- * @param <OutputT>
*/
class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(MultiOutputDoFnExecutor.class);
- /**
- * For multi-output scenario,a "local" tuple tag is used in producer currently while a generated
- * tag is used in downstream consumer. So before output, we need to map this "local" tag to
- * "external" tag. See PCollectionTuple for details.
- */
- public class MultiOutputDoFnExecutorOutputManager extends DoFnExecutorOutputManager {
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- executorsBolt.processExecutorElem(tag, output);
- }
- }
-
- protected Map<TupleTag<?>, TupleTag<?>> localTupleTagMap;
-
public MultiOutputDoFnExecutor(
String stepName,
String description,
@@ -63,13 +47,9 @@ class MultiOutputDoFnExecutor<InputT, OutputT> extends DoFnExecutor<InputT, Outp
Collection<PCollectionView<?>> sideInputs,
Map<TupleTag, PCollectionView<?>> sideInputTagToView,
TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags,
- Map<TupleTag<?>, TupleTag<?>> localTupleTagMap
+ List<TupleTag<?>> sideOutputTags
) {
super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
- this.localTupleTagMap = localTupleTagMap;
- this.outputManager = new MultiOutputDoFnExecutorOutputManager();
- LOG.info("localTupleTagMap: {}", localTupleTagMap);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
index a3ffc30..44c0765 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/MultiStatefulDoFnExecutor.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
/**
* JStorm {@link Executor} for stateful {@link DoFn} with multi-output.
- * @param <OutputT>
*/
class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, OutputT> {
@@ -42,9 +41,9 @@ class MultiStatefulDoFnExecutor<OutputT> extends MultiOutputDoFnExecutor<KV, Out
Coder<WindowedValue<KV>> inputCoder, WindowingStrategy<?, ?> windowingStrategy,
TupleTag<KV> mainInputTag, Collection<PCollectionView<?>> sideInputs,
Map<TupleTag, PCollectionView<?>> sideInputTagToView, TupleTag<OutputT> mainTupleTag,
- List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, TupleTag<?>> localTupleTagMap) {
+ List<TupleTag<?>> sideOutputTags) {
super(stepName, description, pipelineOptions, doFn, inputCoder, windowingStrategy, mainInputTag,
- sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags, localTupleTagMap);
+ sideInputs, sideInputTagToView, mainTupleTag, sideOutputTags);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
index 7daa1cb..986af43 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundMultiTranslator.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.jstorm.translation;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
@@ -33,7 +32,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.PValueBase;
import org.apache.beam.sdk.values.TupleTag;
/**
@@ -50,12 +48,6 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
Map<TupleTag<?>, PValue> allOutputs = Maps.newHashMap(userGraphContext.getOutputs());
- Map<TupleTag<?>, TupleTag<?>> localToExternalTupleTagMap = Maps.newHashMap();
- for (Map.Entry<TupleTag<?>, PValue> entry : allOutputs.entrySet()) {
- Iterator<TupleTag<?>> itr = ((PValueBase) entry.getValue()).expand().keySet().iterator();
- localToExternalTupleTagMap.put(entry.getKey(), itr.next());
- }
-
TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
List<TupleTag<?>> sideOutputTags = userGraphContext.getOutputTags();
sideOutputTags.remove(mainOutputTag);
@@ -90,8 +82,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
transform.getSideInputs(),
sideInputTagToView.build(),
mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
+ sideOutputTags);
} else {
executor = new MultiOutputDoFnExecutor<>(
userGraphContext.getStepName(),
@@ -105,8 +96,7 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
transform.getSideInputs(),
sideInputTagToView.build(),
mainOutputTag,
- sideOutputTags,
- localToExternalTupleTagMap);
+ sideOutputTags);
}
context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
deleted file mode 100644
index e6d09c4..0000000
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ParDoBoundTranslator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.jstorm.translation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Translates a ParDo.Bound to a JStorm {@link DoFnExecutor}.
- */
-class ParDoBoundTranslator<InputT, OutputT>
- extends TransformTranslator.Default<ParDo.SingleOutput<InputT, OutputT>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslator.class);
-
- @Override
- public void translateNode(
- ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
- final TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext();
- final TupleTag<?> inputTag = userGraphContext.getInputTag();
- PCollection<InputT> input = (PCollection<InputT>) userGraphContext.getInput();
-
- TupleTag<OutputT> mainOutputTag = (TupleTag<OutputT>) userGraphContext.getOutputTag();
- List<TupleTag<?>> sideOutputTags = Lists.newArrayList();
-
- Map<TupleTag<?>, PValue> allInputs = Maps.newHashMap(userGraphContext.getInputs());
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- allInputs.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
- String description = describeTransform(
- transform,
- allInputs,
- userGraphContext.getOutputs());
-
- ImmutableMap.Builder<TupleTag, PCollectionView<?>> sideInputTagToView = ImmutableMap.builder();
- for (PCollectionView pCollectionView : transform.getSideInputs()) {
- sideInputTagToView.put(userGraphContext.findTupleTag(pCollectionView), pCollectionView);
- }
-
- DoFnExecutor executor;
- DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass());
- if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
- executor = new StatefulDoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- (DoFn<KV, OutputT>) transform.getFn(),
- (Coder) WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<KV>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- } else {
- executor = new DoFnExecutor<>(
- userGraphContext.getStepName(),
- description,
- userGraphContext.getOptions(),
- transform.getFn(),
- WindowedValue.getFullCoder(
- input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()),
- input.getWindowingStrategy(),
- (TupleTag<InputT>) inputTag,
- transform.getSideInputs(),
- sideInputTagToView.build(),
- mainOutputTag,
- sideOutputTags);
- }
-
- context.addTransformExecutor(executor, ImmutableList.<PValue>copyOf(transform.getSideInputs()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
index 911f259..70e2570 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/StatefulDoFnExecutor.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
/**
* JStorm {@link Executor} for stateful {@link DoFn}.
- * @param <OutputT>
*/
class StatefulDoFnExecutor<OutputT> extends DoFnExecutor<KV, OutputT> {
public StatefulDoFnExecutor(
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
index 24a9050..159fe70 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -30,7 +30,7 @@ interface TimerService extends Serializable {
void init(List<Integer> upStreamTasks);
/**
- *
+ * Update watermark when receiving watermark from a upstream task.
* @param task
* @param inputWatermark
* @return new watermark if any timer is triggered during the update of watermark, otherwise 0
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
index 6b463db..027fc14 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -39,15 +39,15 @@ import org.joda.time.Instant;
* Default implementation of {@link TimerService}.
*/
class TimerServiceImpl implements TimerService {
- private transient ExecutorContext executorContext;
- private transient Map<Integer, DoFnExecutor> idToDoFnExecutor;
+ private ExecutorContext executorContext;
+ private Map<Integer, DoFnExecutor> idToDoFnExecutor;
private final ConcurrentMap<Integer, Long> upStreamTaskToInputWatermark =
new ConcurrentHashMap<>();
private final PriorityQueue<Long> inputWatermarks = new PriorityQueue<>();
private final PriorityQueue<Instant> watermarkHolds = new PriorityQueue<>();
private final Map<String, Instant> namespaceToWatermarkHold = new HashMap<>();
- private final transient PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
+ private final PriorityQueue<TimerInternals.TimerData> eventTimeTimersQueue =
new PriorityQueue<>();
private final Map<TimerInternals.TimerData, Set<Pair<Integer, Object>>>
timerDataToKeyedExecutors = Maps.newHashMap();
@@ -132,7 +132,7 @@ class TimerServiceImpl implements TimerService {
if (currentHold == null) {
namespaceToWatermarkHold.put(namespace, watermarkHold);
watermarkHolds.add(watermarkHold);
- } else if (currentHold != null && watermarkHold.isBefore(currentHold)) {
+ } else if (watermarkHold.isBefore(currentHold)) {
namespaceToWatermarkHold.put(namespace, watermarkHold);
watermarkHolds.add(watermarkHold);
watermarkHolds.remove(currentHold);
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
index 4d431d3..f0b8f74 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TransformTranslator.java
@@ -38,8 +38,8 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
boolean canTranslate(T transform, TranslationContext context);
/**
- * Default translator.
- * @param <T1>
+ * Default translator does NOT translate anything, but just generate
+ * the description of PTransform.
*/
class Default<T1 extends PTransform<?, ?>> implements TransformTranslator<T1> {
@Override
@@ -61,7 +61,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
.transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
@Override
public String apply(Map.Entry<TupleTag<?>, PValue> taggedPValue) {
- return taggedPValue.getKey().getId();
+ if (taggedPValue != null) {
+ return taggedPValue.getKey().getId();
+ } else {
+ return null;
+ }
}
})),
transform.getName(),
@@ -69,7 +73,11 @@ interface TransformTranslator<T extends PTransform<?, ?>> {
.transform(new Function<Map.Entry<TupleTag<?>, PValue>, String>() {
@Override
public String apply(Map.Entry<TupleTag<?>, PValue> taggedPvalue) {
- return taggedPvalue.getKey().getId();
+ if (taggedPvalue != null) {
+ return taggedPvalue.getKey().getId();
+ } else {
+ return null;
+ }
}
})));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
index 0991448..4407f15 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslationContext.java
@@ -198,8 +198,6 @@ public class TranslationContext {
}
bolt.addExecutor(tag, executor, userGraphContext.getStepName());
- // filter all connections inside bolt
- //if (!bolt.getOutputTags().contains(tag)) {
Stream.Grouping grouping;
if (isGBK) {
grouping = Stream.Grouping.byFields(Arrays.asList(CommonInstance.KEY));
@@ -207,7 +205,6 @@ public class TranslationContext {
grouping = Stream.Grouping.of(Stream.Grouping.Type.LOCAL_OR_SHUFFLE);
}
addStormStreamDef(TaggedPValue.of(tag, value), name, grouping);
- //}
}
for (PValue sideInput : sideInputs) {
@@ -223,7 +220,7 @@ public class TranslationContext {
// set parallelismNumber
String pTransformfullName = userGraphContext.currentTransform.getFullName();
String compositeName = pTransformfullName.split("/")[0];
- Map parallelismNumMap = userGraphContext.getOptions().getParallelismNumMap();
+ Map parallelismNumMap = userGraphContext.getOptions().getParallelismMap();
if (parallelismNumMap.containsKey(compositeName)) {
int configNum = (Integer) parallelismNumMap.get(compositeName);
int currNum = bolt.getParallelismNum();
@@ -262,10 +259,21 @@ public class TranslationContext {
return (T) currentTransform.getInputs().values().iterator().next();
}
- public Map<TupleTag<?>, PValue> getInputs() {
+ public Map<TupleTag<?>, PValue> getTransformInputs() {
return currentTransform.getInputs();
}
+ /**
+ * Get input PValues with the output tags of upstream node.
+ */
+ public Map<TupleTag<?>, PValue> getInputs() {
+ Map<TupleTag<?>, PValue> ret = Maps.newHashMap();
+ for (PValue pValue : currentTransform.getInputs().values()) {
+ ret.put(findTupleTag(pValue), pValue);
+ }
+ return ret;
+ }
+
public TupleTag<?> getInputTag() {
return pValueToTupleTag.get(this.getInput());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index 9eaa13a..c8ea545 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -40,7 +40,6 @@ class TranslatorRegistry {
static {
TRANSLATORS.put(Read.Bounded.class, new BoundedSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedSourceTranslator());
- TRANSLATORS.put(ParDo.SingleOutput.class, new ParDoBoundTranslator());
TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoBoundMultiTranslator());
TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator<>());
TRANSLATORS.put(Flatten.PCollections.class, new FlattenTranslator());
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
index 4ae28e6..627a834 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/UnboundedSourceSpout.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.jstorm.JStormPipelineOptions;
import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
@@ -61,7 +62,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
private KryoSerializer<WindowedValue> serializer;
- private long lastWaterMark = 0L;
+ private long lastWaterMark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
public UnboundedSourceSpout(
String name,
@@ -113,7 +114,7 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
}
@Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ public synchronized void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
try {
this.collector = collector;
this.pipelineOptions =
@@ -127,7 +128,8 @@ public class UnboundedSourceSpout extends AbstractComponent implements IRichSpou
}
}
- public void createSourceReader(UnboundedSource.CheckpointMark checkpointMark) throws IOException {
+ public synchronized void createSourceReader(UnboundedSource.CheckpointMark checkpointMark)
+ throws IOException {
if (reader != null) {
reader.close();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
index 9ab5784..de3f568 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ViewTranslator.java
@@ -256,9 +256,7 @@ class ViewTranslator
/**
* Specialized expansion for
* {@link org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView}.
- * @param <InputT>
- * @param <OutputT>
- */
+ */
public static class CombineGloballyAsSingletonView<InputT, OutputT>
extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
Combine.GloballyAsSingletonView<InputT, OutputT> transform;
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
index 8d60392..832c95c 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
/**
* JStorm {@link Executor} for {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- * @param <T>
- * @param <W>
*/
class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor {
private static final Logger LOG = LoggerFactory.getLogger(WindowAssignExecutor.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/90ed2ef3/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
index b2ca267..3acf662 100644
--- a/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
+++ b/runners/jstorm/src/test/java/org/apache/beam/runners/jstorm/translation/JStormStateInternalsTest.java
@@ -64,7 +64,7 @@ public class JStormStateInternalsTest {
IKvStoreManager kvStoreManager = RocksDbKvStoreManagerFactory.getManager(
Maps.newHashMap(),
"test",
- tmp.toString(),
+ tmp.getRoot().toString(),
new KryoSerializer(Maps.newHashMap()));
jstormStateInternals = new JStormStateInternals(
"key-1", kvStoreManager, new TimerServiceImpl(), 0);