You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/03 07:43:22 UTC
[beam] branch master updated: [BEAM-6859] align teardown with setup
calls also for empty streaming batches
This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3df08 [BEAM-6859] align teardown with setup calls also for empty streaming batches
new 0606ba8 Merge pull request #8443 from adude3141/BEAM-6859
5e3df08 is described below
commit 5e3df08c0819c23363d6e83bf9e12de2c6cd7237
Author: Michael Luckey <25...@users.noreply.github.com>
AuthorDate: Wed May 1 00:06:56 2019 +0200
[BEAM-6859] align teardown with setup calls also for empty streaming batches
---
.../spark/translation/MultiDoFnFunction.java | 2 +-
.../translation/streaming/CreateStreamTest.java | 52 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
index badca38..470d283 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java
@@ -118,7 +118,7 @@ public class MultiDoFnFunction<InputT, OutputT>
@Override
public Iterator<Tuple2<TupleTag<?>, WindowedValue<?>>> call(Iterator<WindowedValue<InputT>> iter)
throws Exception {
- if (!wasSetupCalled) {
+ if (!wasSetupCalled && iter.hasNext()) {
DoFnInvokers.invokerFor(doFn).invokeSetup();
wasSetupCalled = true;
}
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index 1842085..1ea8ce8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -18,12 +18,15 @@
package org.apache.beam.runners.spark.translation.streaming;
import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.spark.ReuseSparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
@@ -393,6 +396,33 @@ public class CreateStreamTest implements Serializable {
p.run();
}
+ /**
+ * Test that {@link ParDo} aligns both setup and teardown calls in streaming pipelines. See
+ * https://issues.apache.org/jira/browse/BEAM-6859.
+ */
+ @Test
+ public void testParDoCallsSetupAndTeardown() {
+ Instant instant = new Instant(0);
+
+ p.apply(
+ CreateStream.of(VarIntCoder.of(), batchDuration())
+ .emptyBatch()
+ .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(5)))
+ .nextBatch(
+ TimestampedValue.of(1, instant),
+ TimestampedValue.of(2, instant),
+ TimestampedValue.of(3, instant))
+ .advanceNextBatchWatermarkToInfinity())
+ .apply(ParDo.of(new LifecycleDoFn()));
+
+ p.run();
+
+ assertThat(
+ "Function should have been torn down",
+ LifecycleDoFn.teardownCalls.intValue(),
+ is(equalTo(LifecycleDoFn.setupCalls.intValue())));
+ }
+
@Test
public void testElementAtPositiveInfinityThrows() {
CreateStream<Integer> source =
@@ -425,4 +455,26 @@ public class CreateStreamTest implements Serializable {
return Duration.millis(
(p.getOptions().as(SparkPipelineOptions.class)).getBatchIntervalMillis());
}
+
+ private static class LifecycleDoFn extends DoFn<Integer, Integer> {
+ static AtomicInteger setupCalls = new AtomicInteger(0);
+ static AtomicInteger teardownCalls = new AtomicInteger(0);
+
+ @Setup
+ public void setup() {
+ setupCalls.incrementAndGet();
+ }
+
+ @Teardown
+ public void teardown() {
+ teardownCalls.incrementAndGet();
+ }
+
+ @SuppressWarnings("unused")
+ @ProcessElement
+ public void process(ProcessContext context) {
+ Integer element = context.element();
+ context.output(element);
+ }
+ }
}