You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ja...@apache.org on 2019/10/21 18:04:33 UTC

[beam] branch master updated: [BEAM-8439] Delay bundle start until pushed back data is emitted

This is an automated email from the ASF dual-hosted git repository.

janl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 49def1e  [BEAM-8439] Delay bundle start until pushed back data is emitted
     new 7f8e4e4  Merge pull request #9841 from je-ik/BEAM-8439: [BEAM-8439] avoid creation of empty bundles
49def1e is described below

commit 49def1ef43894c1ed948465e89e1f9112d889d97
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Oct 21 17:05:53 2019 +0200

    [BEAM-8439] Delay bundle start until pushed back data is emitted
---
 .../runners/flink/translation/wrappers/streaming/DoFnOperator.java  | 6 ++----
 .../flink/translation/wrappers/streaming/DoFnOperatorTest.java      | 5 ++---
 .../wrappers/streaming/ExecutableStageDoFnOperatorTest.java         | 4 ----
 3 files changed, 4 insertions(+), 11 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 4f48287..b4ce64f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -630,7 +630,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
 
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
-    checkInvokeStartBundle();
     // We do the check here because we are guaranteed to at least get the +Inf watermark on the
     // main input when the job finishes.
     if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -677,7 +676,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    checkInvokeStartBundle();
 
     setCurrentSideInputWatermark(mark.getTimestamp());
     if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -698,6 +696,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
     Iterator<WindowedValue<InputT>> it = pushedBackElementsHandler.getElements().iterator();
 
     while (it.hasNext()) {
+      checkInvokeStartBundle();
       WindowedValue<InputT> element = it.next();
       // we need to set the correct key in case the operator is
       // a (keyed) window operator
@@ -790,8 +789,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
 
   @Override
   public void onEventTime(InternalTimer<ByteBuffer, TimerData> timer) throws Exception {
-    // We don't have to cal checkInvokeStartBundle() because it's already called in
-    // processWatermark*().
+    checkInvokeStartBundle();
     fireTimer(timer);
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 57f7694..2a83944 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -1274,7 +1274,8 @@ public class DoFnOperatorTest {
             WindowedValue.valueInGlobalWindow("d"),
             WindowedValue.valueInGlobalWindow("finishBundle")));
 
-    // A final bundle will be created when sending the MAX watermark
+    // No bundle will be created when sending the MAX watermark
+    // (unless pushed back items are emitted)
     newHarness.close();
 
     assertThat(
@@ -1282,7 +1283,6 @@ public class DoFnOperatorTest {
         contains(
             WindowedValue.valueInGlobalWindow("finishBundle"),
             WindowedValue.valueInGlobalWindow("d"),
-            WindowedValue.valueInGlobalWindow("finishBundle"),
             WindowedValue.valueInGlobalWindow("finishBundle")));
 
     // close() will also call dispose(), but call again to verify no new bundle
@@ -1294,7 +1294,6 @@ public class DoFnOperatorTest {
         contains(
             WindowedValue.valueInGlobalWindow("finishBundle"),
             WindowedValue.valueInGlobalWindow("d"),
-            WindowedValue.valueInGlobalWindow("finishBundle"),
             WindowedValue.valueInGlobalWindow("finishBundle")));
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 8134b24..0d7c99f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -403,10 +403,6 @@ public class ExecutableStageDoFnOperatorTest {
     verify(stageBundleFactory).getProcessBundleDescriptor();
     verify(stageBundleFactory).close();
     verify(stageContext).close();
-    // DoFnOperator generates a final watermark, which triggers a new bundle..
-    verify(stageBundleFactory).getBundle(any(), any(), any());
-    verify(bundle).getInputReceivers();
-    verify(bundle).close();
     verifyNoMoreInteractions(stageBundleFactory);
 
     // close() will also call dispose(), but call again to verify no new bundle