You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:37 UTC
[37/53] [abbrv] beam git commit: jstorm-runner: support Flatten with
empty inputs.
jstorm-runner: support Flatten with empty inputs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aca16cc9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aca16cc9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aca16cc9
Branch: refs/heads/jstorm-runner
Commit: aca16cc9b2224b9bfce98719c6ef2abbad94f7df
Parents: 4d634ec
Author: Pei He <pe...@apache.org>
Authored: Wed Jul 19 15:34:56 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800
----------------------------------------------------------------------
.../jstorm/translation/FlattenTranslator.java | 104 ++++++++++++++++++-
1 file changed, 100 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/aca16cc9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index 89708df..8f239bf 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -18,11 +18,24 @@
package org.apache.beam.runners.jstorm.translation;
import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
/**
* Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
@@ -40,10 +53,93 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
PCollection<V> pc = (PCollection<V>) entry.getValue();
inputs.putAll(pc.expand());
}
- System.out.println("Real inputs: " + inputs);
- System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
- FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
- context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+
+ if (inputs.isEmpty()) {
+ // Create a empty source
+ TupleTag<?> tag = userGraphContext.getOutputTag();
+ PValue output = userGraphContext.getOutput();
+
+ UnboundedSourceSpout spout = new UnboundedSourceSpout(
+ description,
+ new EmptySource(),
+ userGraphContext.getOptions(),
+ tag);
+ context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+
+ } else {
+ FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+ context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+ }
+ }
+
+ private static class EmptySource extends UnboundedSource<Void, UnboundedSource.CheckpointMark> {
+ @Override
+ public List<? extends UnboundedSource<Void, CheckpointMark>> split(
+ int i, PipelineOptions pipelineOptions) throws Exception {
+ return Collections.singletonList(this);
+ }
+
+ @Override
+ public UnboundedReader<Void> createReader(
+ PipelineOptions pipelineOptions,
+ @Nullable CheckpointMark checkpointMark) throws IOException {
+ return new EmptyReader();
+ }
+
+ @Override
+ public Coder<CheckpointMark> getCheckpointMarkCoder() {
+ return null;
+ }
+
+ @Override
+ public void validate() {
+ }
+
+ @Override
+ public Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+
+ private class EmptyReader extends UnboundedReader<Void> {
+ @Override
+ public boolean start() throws IOException {
+ return false;
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return false;
+ }
+
+ @Override
+ public Void getCurrent() throws NoSuchElementException {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return null;
+ }
+
+ @Override
+ public UnboundedSource<Void, ?> getCurrentSource() {
+ return EmptySource.this;
+ }
+ }
}
}