You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ja...@apache.org on 2019/10/21 18:04:33 UTC
[beam] branch master updated: [BEAM-8439] Delay bundle start until
pushed back data is emitted
This is an automated email from the ASF dual-hosted git repository.
janl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 49def1e [BEAM-8439] Delay bundle start until pushed back data is emitted
new 7f8e4e4 Merge pull request #9841 from je-ik/BEAM-8439: [BEAM-8439] avoid creation of empty bundles
49def1e is described below
commit 49def1ef43894c1ed948465e89e1f9112d889d97
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Mon Oct 21 17:05:53 2019 +0200
[BEAM-8439] Delay bundle start until pushed back data is emitted
---
.../runners/flink/translation/wrappers/streaming/DoFnOperator.java | 6 ++----
.../flink/translation/wrappers/streaming/DoFnOperatorTest.java | 5 ++---
.../wrappers/streaming/ExecutableStageDoFnOperatorTest.java | 4 ----
3 files changed, 4 insertions(+), 11 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 4f48287..b4ce64f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -630,7 +630,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
@Override
public void processWatermark1(Watermark mark) throws Exception {
- checkInvokeStartBundle();
// We do the check here because we are guaranteed to at least get the +Inf watermark on the
// main input when the job finishes.
if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -677,7 +676,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
@Override
public void processWatermark2(Watermark mark) throws Exception {
- checkInvokeStartBundle();
setCurrentSideInputWatermark(mark.getTimestamp());
if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -698,6 +696,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
Iterator<WindowedValue<InputT>> it = pushedBackElementsHandler.getElements().iterator();
while (it.hasNext()) {
+ checkInvokeStartBundle();
WindowedValue<InputT> element = it.next();
// we need to set the correct key in case the operator is
// a (keyed) window operator
@@ -790,8 +789,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
@Override
public void onEventTime(InternalTimer<ByteBuffer, TimerData> timer) throws Exception {
- // We don't have to cal checkInvokeStartBundle() because it's already called in
- // processWatermark*().
+ checkInvokeStartBundle();
fireTimer(timer);
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 57f7694..2a83944 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -1274,7 +1274,8 @@ public class DoFnOperatorTest {
WindowedValue.valueInGlobalWindow("d"),
WindowedValue.valueInGlobalWindow("finishBundle")));
- // A final bundle will be created when sending the MAX watermark
+ // No bundle will be created when sending the MAX watermark
+ // (unless pushed back items are emitted)
newHarness.close();
assertThat(
@@ -1282,7 +1283,6 @@ public class DoFnOperatorTest {
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
- WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
// close() will also call dispose(), but call again to verify no new bundle
@@ -1294,7 +1294,6 @@ public class DoFnOperatorTest {
contains(
WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("d"),
- WindowedValue.valueInGlobalWindow("finishBundle"),
WindowedValue.valueInGlobalWindow("finishBundle")));
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 8134b24..0d7c99f 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -403,10 +403,6 @@ public class ExecutableStageDoFnOperatorTest {
verify(stageBundleFactory).getProcessBundleDescriptor();
verify(stageBundleFactory).close();
verify(stageContext).close();
- // DoFnOperator generates a final watermark, which triggers a new bundle..
- verify(stageBundleFactory).getBundle(any(), any(), any());
- verify(bundle).getInputReceivers();
- verify(bundle).close();
verifyNoMoreInteractions(stageBundleFactory);
// close() will also call dispose(), but call again to verify no new bundle