You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/18 18:14:50 UTC

[beam] branch spark-runner_structured-streaming updated: Address minor review notes

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 16ac4a0  Address minor review notes
16ac4a0 is described below

commit 16ac4a026dc17dcdf6bb5fe2250af6bf06e1cfbc
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Fri Jan 18 19:13:34 2019 +0100

    Address minor review notes
---
 .../translation/TranslationContext.java                  |  4 ++++
 .../translation/batch/DoFnFunction.java                  | 16 ++++++++++++----
 .../translation/batch/ParDoTranslatorBatch.java          | 12 ++++++------
 .../{SparkProcessContext.java => ProcessContext.java}    | 12 ++----------
 .../{SparkNoOpStepContext.java => NoOpStepContext.java}  |  2 +-
 5 files changed, 25 insertions(+), 21 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 75b470e..ca69261 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -109,6 +109,10 @@ public class TranslationContext {
     return (Dataset<WindowedValue<T>>) dataset;
   }
 
+  /**
+   * TODO: All these 3 methods (putDataset*) are temporary and they are used only for generics type
+   * checking. We should unify them in the future.
+   */
   public void putDatasetWildcard(PValue value, Dataset<WindowedValue<?>> dataset) {
     if (!datasets.containsKey(value)) {
       datasets.put(value, dataset);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index 35204bc..62629ee 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -26,7 +26,7 @@ import com.google.common.collect.Multimap;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkNoOpStepContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.NoOpStepContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SparkSideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,6 +45,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Encapsulates a {@link DoFn} inside a Spark {@link
+ * org.apache.spark.api.java.function.MapPartitionsFunction}.
+ *
+ * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index and must tag
+ * all outputs with the output number. Afterwards a filter will filter out those elements that are
+ * not to be in a specific output.
+ */
 public class DoFnFunction<InputT, OutputT>
     implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
@@ -98,18 +106,18 @@ public class DoFnFunction<InputT, OutputT>
             outputManager,
             mainOutputTag,
             additionalOutputTags,
-            new SparkNoOpStepContext(),
+            new NoOpStepContext(),
             inputCoder,
             outputCoderMap,
             windowingStrategy);
 
-    return new SparkProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
+    return new ProcessContext<>(doFn, doFnRunner, outputManager, Collections.emptyIterator())
         .processPartition(iter)
         .iterator();
   }
 
   private class DoFnOutputManager
-      implements SparkProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
+      implements ProcessContext.SparkOutputManager<Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
     private final Multimap<TupleTag<?>, WindowedValue<?>> outputs = LinkedListMultimap.create();
 
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
index 9cbde5a..f80db9a 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java
@@ -69,19 +69,19 @@ class ParDoTranslatorBatch<InputT, OutputT>
     Map<TupleTag<?>, PValue> outputs = context.getOutputs();
     TupleTag<?> mainOutputTag = getTupleTag(context);
 
-    Map<TupleTag<?>, Integer> outputMap = Maps.newHashMap();
+    Map<TupleTag<?>, Integer> outputTags = Maps.newHashMap();
 
-    outputMap.put(mainOutputTag, 0);
+    outputTags.put(mainOutputTag, 0);
     int count = 1;
     for (TupleTag<?> tag : outputs.keySet()) {
-      if (!outputMap.containsKey(tag)) {
-        outputMap.put(tag, count++);
+      if (!outputTags.containsKey(tag)) {
+        outputTags.put(tag, count++);
       }
     }
 
     // Union coder elements must match the order of the output tags.
     Map<Integer, TupleTag<?>> indexMap = Maps.newTreeMap();
-    for (Map.Entry<TupleTag<?>, Integer> entry : outputMap.entrySet()) {
+    for (Map.Entry<TupleTag<?>, Integer> entry : outputTags.entrySet()) {
       indexMap.put(entry.getValue(), entry.getKey());
     }
 
@@ -126,7 +126,7 @@ class ParDoTranslatorBatch<InputT, OutputT>
             windowingStrategy,
             sideInputStrategies,
             context.getOptions(),
-            outputMap,
+            outputTags,
             mainOutputTag,
             context.getInput(transform).getCoder(),
             outputCoderMap);
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
similarity index 91%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
index 720b7ab..f51bd80 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SparkProcessContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
@@ -31,14 +31,14 @@ import java.util.Iterator;
 import static com.google.common.base.Preconditions.checkArgument;
 
 /** Spark runner process context processes Spark partitions using Beam's {@link DoFnRunner}. */
-class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
+class ProcessContext<FnInputT, FnOutputT, OutputT> {
 
   private final DoFn<FnInputT, FnOutputT> doFn;
   private final DoFnRunner<FnInputT, FnOutputT> doFnRunner;
   private final SparkOutputManager<OutputT> outputManager;
   private Iterator<TimerInternals.TimerData> timerDataIterator;
 
-  SparkProcessContext(
+  ProcessContext(
       DoFn<FnInputT, FnOutputT> doFn,
       DoFnRunner<FnInputT, FnOutputT> doFnRunner,
       SparkOutputManager<OutputT> outputManager,
@@ -119,7 +119,6 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
             doFnRunner.processElement(inputIterator.next());
             outputIterator = getOutputIterator();
           } else if (timerDataIterator.hasNext()) {
-            fireTimer(timerDataIterator.next());
             outputIterator = getOutputIterator();
           } else {
             // no more input to consume, but finishBundle can produce more output
@@ -138,12 +137,5 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
         throw re;
       }
     }
-
-    private void fireTimer(TimerInternals.TimerData timer) {
-      StateNamespace namespace = timer.getNamespace();
-      checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
-      BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-      doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
-    }
   }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
similarity index 95%
rename from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
rename to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
index 889cdf5..25e6f11 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/SparkNoOpStepContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/NoOpStepContext.java
@@ -22,7 +22,7 @@ import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
 
 /** A {@link StepContext} for Spark Batch Runner execution. */
-public class SparkNoOpStepContext implements StepContext {
+public class NoOpStepContext implements StepContext {
 
   @Override
   public StateInternals stateInternals() {