You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2023/01/16 01:34:51 UTC
[flink] branch release-1.15 updated: [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new d9115d8c2aa [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
d9115d8c2aa is described below
commit d9115d8c2aae75c0b17471c1ab8a0769e606def1
Author: Dian Fu <di...@apache.org>
AuthorDate: Thu Jan 12 16:23:53 2023 +0800
[FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
This closes #21655.
---
.../python/AbstractPythonFunctionOperator.java | 11 +--
...onGroupWindowAggregateFunctionOperatorTest.java | 84 ----------------------
...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 43 +----------
...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 43 +----------
4 files changed, 8 insertions(+), 173 deletions(-)
diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index a967f85a3c1..271cbca7753 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -186,17 +186,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
// gives better throughput due to the bundle not getting cut on
// every watermark. So we have implemented 2) below.
- // advance the watermark and do not emit watermark to downstream operators
- if (getTimeServiceManager().isPresent()) {
- getTimeServiceManager().get().advanceWatermark(mark);
- }
-
if (mark.getTimestamp() == Long.MAX_VALUE) {
invokeFinishBundle();
processElementsOfCurrentKeyIfNeeded(null);
advanceWatermark(mark);
output.emitWatermark(mark);
} else if (isBundleFinished()) {
+ advanceWatermark(mark);
output.emitWatermark(mark);
} else {
// It is not safe to advance the output watermark yet, so add a hold on the current
@@ -204,6 +200,11 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
bundleFinishedCallback =
() -> {
try {
+ // avoid invoking bundleFinishedCallback repeatedly in advanceWatermark
+ // which will invoke finishBundle(which will finally invoke
+ // bundleFinishedCallback)
+ bundleFinishedCallback = null;
+
advanceWatermark(mark);
// at this point the bundle is finished, allow the watermark to pass
output.emitWatermark(mark);
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index ddf106357dc..3f45fc07bdc 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -327,90 +327,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
}
- @Test
- public void testFinishBundleTriggeredByTime() throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
- conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
- OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
- testHarness.processWatermark(new Watermark(20000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
- testHarness.setProcessingTime(1000L);
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c1",
- 0L,
- TimestampData.fromEpochMillis(-5000L),
- TimestampData.fromEpochMillis(5000L))));
-
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c2",
- 3L,
- TimestampData.fromEpochMillis(-5000L),
- TimestampData.fromEpochMillis(5000L))));
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c2",
- 3L,
- TimestampData.fromEpochMillis(0L),
- TimestampData.fromEpochMillis(10000L))));
-
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c1",
- 0L,
- TimestampData.fromEpochMillis(0L),
- TimestampData.fromEpochMillis(10000L))));
-
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c1",
- 1L,
- TimestampData.fromEpochMillis(5000L),
- TimestampData.fromEpochMillis(15000L))));
- expectedOutput.add(
- new StreamRecord<>(
- newRow(
- true,
- "c1",
- 2L,
- TimestampData.fromEpochMillis(10000L),
- TimestampData.fromEpochMillis(20000L))));
-
- expectedOutput.add(new Watermark(20000L));
-
- assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
@Override
public LogicalType[] getOutputLogicalType() {
return new LogicalType[] {
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 737478fd2cd..1918ea7d4be 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -94,7 +94,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
}
@Test
- public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+ public void testFinishBundleTriggeredOnWatermark() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
@@ -114,11 +114,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
- // checkpoint trigger finishBundle
- testHarness.prepareSnapshotPreBarrier(0L);
-
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
@@ -165,42 +160,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
testHarness.close();
}
- @Test
- public void testFinishBundleTriggeredByTime() throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
- conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
- OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 1L), initialTime + 1));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 1L), initialTime + 2));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
- testHarness.processWatermark(new Watermark(10000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
- testHarness.setProcessingTime(1000L);
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L)));
- expectedOutput.add(new Watermark(10000L));
-
- assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
@Test
public void testStateCleanup() throws Exception {
Configuration conf = new Configuration();
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index 4123d492975..abde046397b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -90,7 +90,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
}
@Test
- public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+ public void testFinishBundleTriggeredOnWatermark() throws Exception {
Configuration conf = new Configuration();
conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
@@ -110,11 +110,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
- // checkpoint trigger finishBundle
- testHarness.prepareSnapshotPreBarrier(0L);
-
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
@@ -161,42 +156,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
testHarness.close();
}
- @Test
- public void testFinishBundleTriggeredByTime() throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
- conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
- OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 1L), initialTime + 1));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 1L), initialTime + 2));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3));
- testHarness.processElement(
- new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
- testHarness.processWatermark(new Watermark(10000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
- testHarness.setProcessingTime(1000L);
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
- expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L)));
- expectedOutput.add(new Watermark(10000L));
-
- assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
@Override
public LogicalType[] getOutputLogicalType() {
return new LogicalType[] {