You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/03 07:43:22 UTC

[beam] branch master updated: [BEAM-6859] align teardown with setup calls also for empty streaming batches

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e3df08  [BEAM-6859] align teardown with setup calls also for empty streaming batches
     new 0606ba8  Merge pull request #8443 from adude3141/BEAM-6859
5e3df08 is described below

commit 5e3df08c0819c23363d6e83bf9e12de2c6cd7237
Author: Michael Luckey <25...@users.noreply.github.com>
AuthorDate: Wed May 1 00:06:56 2019 +0200

    [BEAM-6859] align teardown with setup calls also for empty streaming batches
---
 .../spark/translation/MultiDoFnFunction.java       |  2 +-
 .../translation/streaming/CreateStreamTest.java    | 52 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index badca38..470d283 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -118,7 +118,7 @@ public class MultiDoFnFunction<InputT, OutputT>
   @Override
   public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
       throws Exception {
-    if (!wasSetupCalled) {
+    if (!wasSetupCalled && iter.hasNext()) {
       DoFnInvokers.invokerFor(doFn).invokeSetup();
       wasSetupCalled = true;
     }
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 1842085..1ea8ce8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -18,12 +18,15 @@
 package org.apache.beam.runners.spark.translation.streaming;
 
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.spark.ReuseSparkContextRule;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.StreamingTest;
@@ -393,6 +396,33 @@ public class CreateStreamTest implements Serializable {
     p.run();
   }
 
+  /**
+   * Test that {@link ParDo} aligns both setup and teardown calls in streaming pipelines. See
+   * https://issues.apache.org/jira/browse/BEAM-6859.
+   */
+  @Test
+  public void testParDoCallsSetupAndTeardown() {
+    Instant instant = new Instant(0);
+
+    p.apply(
+            CreateStream.of(VarIntCoder.of(), batchDuration())
+                .emptyBatch()
+                .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
+                .nextBatch(
+                    TimestampedValue.of(1, instant),
+                    TimestampedValue.of(2, instant),
+                    TimestampedValue.of(3, instant))
+                .advanceNextBatchWatermarkToInfinity())
+        .apply(ParDo.of(new LifecycleDoFn()));
+
+    p.run();
+
+    assertThat(
+        "Function should have been torn down",
+        LifecycleDoFn.teardownCalls.intValue(),
+        is(equalTo(LifecycleDoFn.setupCalls.intValue())));
+  }
+
   @Test
   public void testElementAtPositiveInfinityThrows() {
     CreateStream<Integer> source =
@@ -425,4 +455,26 @@ public class CreateStreamTest implements Serializable {
     return Duration.millis(
         (p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis());
   }
+
+  private static class LifecycleDoFn extends DoFn<Integer, Integer> {
+    static AtomicInteger setupCalls = new AtomicInteger(0);
+    static AtomicInteger teardownCalls = new AtomicInteger(0);
+
+    @Setup
+    public void setup() {
+      setupCalls.incrementAndGet();
+    }
+
+    @Teardown
+    public void teardown() {
+      teardownCalls.incrementAndGet();
+    }
+
+    @SuppressWarnings("unused")
+    @ProcessElement
+    public void process(ProcessContext context) {
+      Integer element = context.element();
+      context.output(element);
+    }
+  }
 }