You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/05/09 15:10:16 UTC

[beam] branch spark-runner_structured-streaming updated: Fixes ParDo not calling setup and not tearing down if exception on startBundle

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 017497c  Fixes ParDo not calling setup and not tearing down if exception on startBundle
017497c is described below

commit 017497c2662bb8bd414286f946e51b946a4c75e3
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Thu May 9 16:20:08 2019 +0200

    Fixes ParDo not calling setup and not tearing down if exception on startBundle
---
 runners/spark/build.gradle                         |  1 -
 .../translation/batch/DoFnFunction.java            |  6 ++++++
 .../translation/batch/ProcessContext.java          | 22 +++++++++++-----------
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 72e6013..ad43212 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -186,7 +186,6 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
     includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
     excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
     excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
-    excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
     // Unbounded
     excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
     excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index 4f158bb..daa802d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.utils.Cache
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -54,6 +55,7 @@ public class DoFnFunction<InputT, OutputT>
     implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
 
   private final DoFn<InputT, OutputT> doFn;
+  private transient boolean wasSetupCalled;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
   private final SerializablePipelineOptions serializableOptions;
@@ -90,6 +92,10 @@ public class DoFnFunction<InputT, OutputT>
   @Override
   public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
       throws Exception {
+    if (!wasSetupCalled && iter.hasNext()) {
+      DoFnInvokers.tryInvokeSetupFor(doFn);
+      wasSetupCalled = true;
+    }
 
     DoFnOutputManager outputManager = new DoFnOutputManager();
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
index 33ca3d0..d3453a7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
@@ -93,18 +93,18 @@ class ProcessContext<FnInputT, FnOutputT, OutputT> {
 
     @Override
     protected OutputT computeNext() {
-      // Process each element from the (input) iterator, which produces, zero, one or more
-      // output elements (of type V) in the output iterator. Note that the output
-      // collection (and iterator) is reset between each call to processElement, so the
-      // collection only holds the output values for each call to processElement, rather
-      // than for the whole partition (which would use too much memory).
-      if (!isBundleStarted) {
-        isBundleStarted = true;
-        // call startBundle() before beginning to process the partition.
-        doFnRunner.startBundle();
-      }
-
       try {
+        // Process each element from the (input) iterator, which produces, zero, one or more
+        // output elements (of type V) in the output iterator. Note that the output
+        // collection (and iterator) is reset between each call to processElement, so the
+        // collection only holds the output values for each call to processElement, rather
+        // than for the whole partition (which would use too much memory).
+        if (!isBundleStarted) {
+          isBundleStarted = true;
+          // call startBundle() before beginning to process the partition.
+          doFnRunner.startBundle();
+        }
+
         while (true) {
           if (outputIterator.hasNext()) {
             return outputIterator.next();