You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:37 UTC

[37/53] [abbrv] beam git commit: jstorm-runner: support Flatten with empty inputs.

jstorm-runner: support Flatten with empty inputs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/aca16cc9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/aca16cc9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/aca16cc9

Branch: refs/heads/jstorm-runner
Commit: aca16cc9b2224b9bfce98719c6ef2abbad94f7df
Parents: 4d634ec
Author: Pei He <pe...@apache.org>
Authored: Wed Jul 19 15:34:56 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:59 2017 +0800

----------------------------------------------------------------------
 .../jstorm/translation/FlattenTranslator.java   | 104 ++++++++++++++++++-
 1 file changed, 100 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/aca16cc9/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
index 89708df..8f239bf 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/FlattenTranslator.java
@@ -18,11 +18,24 @@
 package org.apache.beam.runners.jstorm.translation;
 
 import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
 
 /**
  * Translates a {@link Flatten} to a JStorm {@link FlattenExecutor}.
@@ -40,10 +53,93 @@ class FlattenTranslator<V> extends TransformTranslator.Default<Flatten.PCollecti
       PCollection<V> pc = (PCollection<V>) entry.getValue();
       inputs.putAll(pc.expand());
     }
-    System.out.println("Real inputs: " + inputs);
-    System.out.println("FlattenList inputs: " + userGraphContext.getInputs());
     String description = describeTransform(transform, inputs, userGraphContext.getOutputs());
-    FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
-    context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+
+    if (inputs.isEmpty()) {
+      // Create a empty source
+      TupleTag<?> tag = userGraphContext.getOutputTag();
+      PValue output = userGraphContext.getOutput();
+
+      UnboundedSourceSpout spout = new UnboundedSourceSpout(
+          description,
+          new EmptySource(),
+          userGraphContext.getOptions(),
+          tag);
+      context.getExecutionGraphContext().registerSpout(spout, TaggedPValue.of(tag, output));
+
+    } else {
+      FlattenExecutor executor = new FlattenExecutor(description, userGraphContext.getOutputTag());
+      context.addTransformExecutor(executor, inputs, userGraphContext.getOutputs());
+    }
+  }
+
+  private static class EmptySource extends UnboundedSource<Void, UnboundedSource.CheckpointMark> {
+    @Override
+    public List<? extends UnboundedSource<Void, CheckpointMark>> split(
+        int i, PipelineOptions pipelineOptions) throws Exception {
+      return Collections.singletonList(this);
+    }
+
+    @Override
+    public UnboundedReader<Void> createReader(
+        PipelineOptions pipelineOptions,
+        @Nullable CheckpointMark checkpointMark) throws IOException {
+      return new EmptyReader();
+    }
+
+    @Override
+    public Coder<CheckpointMark> getCheckpointMarkCoder() {
+      return null;
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+
+    private class EmptyReader extends UnboundedReader<Void> {
+      @Override
+      public boolean start() throws IOException {
+        return false;
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return false;
+      }
+
+      @Override
+      public Void getCurrent() throws NoSuchElementException {
+        throw new NoSuchElementException();
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        throw new NoSuchElementException();
+      }
+
+      @Override
+      public void close() throws IOException {
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return BoundedWindow.TIMESTAMP_MAX_VALUE;
+      }
+
+      @Override
+      public CheckpointMark getCheckpointMark() {
+        return null;
+      }
+
+      @Override
+      public UnboundedSource<Void, ?> getCurrentSource() {
+        return EmptySource.this;
+      }
+    }
   }
 }