You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/05/09 15:10:16 UTC
[beam] branch spark-runner_structured-streaming updated: Fixes
ParDo not calling setup and not tearing down if exception on startBundle
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
new 017497c Fixes ParDo not calling setup and not tearing down if exception on startBundle
017497c is described below
commit 017497c2662bb8bd414286f946e51b946a4c75e3
Author: Ismaël Mejía <ie...@gmail.com>
AuthorDate: Thu May 9 16:20:08 2019 +0200
Fixes ParDo not calling setup and not tearing down if exception on startBundle
---
runners/spark/build.gradle | 1 -
.../translation/batch/DoFnFunction.java | 6 ++++++
.../translation/batch/ProcessContext.java | 22 +++++++++++-----------
3 files changed, 17 insertions(+), 12 deletions(-)
diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle
index 72e6013..ad43212 100644
--- a/runners/spark/build.gradle
+++ b/runners/spark/build.gradle
@@ -186,7 +186,6 @@ task validatesStructuredStreamingRunnerBatch(type: Test) {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
- excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
// Unbounded
excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
index 4f158bb..daa802d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnFunction.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.utils.Cache
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -54,6 +55,7 @@ public class DoFnFunction<InputT, OutputT>
implements MapPartitionsFunction<WindowedValue<InputT>, Tuple2<TupleTag<?>, WindowedValue<?>>> {
private final DoFn<InputT, OutputT> doFn;
+ private transient boolean wasSetupCalled;
private final WindowingStrategy<?, ?> windowingStrategy;
private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
private final SerializablePipelineOptions serializableOptions;
@@ -90,6 +92,10 @@ public class DoFnFunction<InputT, OutputT>
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
+ if (!wasSetupCalled && iter.hasNext()) {
+ DoFnInvokers.tryInvokeSetupFor(doFn);
+ wasSetupCalled = true;
+ }
DoFnOutputManager outputManager = new DoFnOutputManager();
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
index 33ca3d0..d3453a7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ProcessContext.java
@@ -93,18 +93,18 @@ class ProcessContext<FnInputT, FnOutputT, OutputT> {
@Override
protected OutputT computeNext() {
- // Process each element from the (input) iterator, which produces, zero, one or more
- // output elements (of type V) in the output iterator. Note that the output
- // collection (and iterator) is reset between each call to processElement, so the
- // collection only holds the output values for each call to processElement, rather
- // than for the whole partition (which would use too much memory).
- if (!isBundleStarted) {
- isBundleStarted = true;
- // call startBundle() before beginning to process the partition.
- doFnRunner.startBundle();
- }
-
try {
+ // Process each element from the (input) iterator, which produces, zero, one or more
+ // output elements (of type V) in the output iterator. Note that the output
+ // collection (and iterator) is reset between each call to processElement, so the
+ // collection only holds the output values for each call to processElement, rather
+ // than for the whole partition (which would use too much memory).
+ if (!isBundleStarted) {
+ isBundleStarted = true;
+ // call startBundle() before beginning to process the partition.
+ doFnRunner.startBundle();
+ }
+
while (true) {
if (outputIterator.hasNext()) {
return outputIterator.next();