You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2023/01/16 01:34:51 UTC

[flink] branch release-1.15 updated: [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new d9115d8c2aa [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
d9115d8c2aa is described below

commit d9115d8c2aae75c0b17471c1ab8a0769e606def1
Author: Dian Fu <di...@apache.org>
AuthorDate: Thu Jan 12 16:23:53 2023 +0800

    [FLINK-29231][python] Fix the issue that Python UDAF may produce multiple results for the same window
    
    This closes #21655.
---
 .../python/AbstractPythonFunctionOperator.java     | 11 +--
 ...onGroupWindowAggregateFunctionOperatorTest.java | 84 ----------------------
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 43 +----------
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 43 +----------
 4 files changed, 8 insertions(+), 173 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index a967f85a3c1..271cbca7753 100644
--- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -186,17 +186,13 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
         // gives better throughput due to the bundle not getting cut on
         // every watermark. So we have implemented 2) below.
 
-        // advance the watermark and do not emit watermark to downstream operators
-        if (getTimeServiceManager().isPresent()) {
-            getTimeServiceManager().get().advanceWatermark(mark);
-        }
-
         if (mark.getTimestamp() == Long.MAX_VALUE) {
             invokeFinishBundle();
             processElementsOfCurrentKeyIfNeeded(null);
             advanceWatermark(mark);
             output.emitWatermark(mark);
         } else if (isBundleFinished()) {
+            advanceWatermark(mark);
             output.emitWatermark(mark);
         } else {
             // It is not safe to advance the output watermark yet, so add a hold on the current
@@ -204,6 +200,11 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream
             bundleFinishedCallback =
                     () -> {
                         try {
+                            // avoid invoking bundleFinishedCallback repeatedly in advanceWatermark
+                            // which will invoke finishBundle(which will finally invoke
+                            // bundleFinishedCallback)
+                            bundleFinishedCallback = null;
+
                             advanceWatermark(mark);
                             // at this point the bundle is finished, allow the watermark to pass
                             output.emitWatermark(mark);
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index ddf106357dc..3f45fc07bdc 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -327,90 +327,6 @@ public class StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
     }
 
-    @Test
-    public void testFinishBundleTriggeredByTime() throws Exception {
-        Configuration conf = new Configuration();
-        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
-        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
-        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
-        long initialTime = 0L;
-        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-        testHarness.open();
-
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 0L), initialTime + 1));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 6000L), initialTime + 2));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10000L), initialTime + 3));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), initialTime + 4));
-        testHarness.processWatermark(new Watermark(20000L));
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
-        testHarness.setProcessingTime(1000L);
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c1",
-                                0L,
-                                TimestampData.fromEpochMillis(-5000L),
-                                TimestampData.fromEpochMillis(5000L))));
-
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c2",
-                                3L,
-                                TimestampData.fromEpochMillis(-5000L),
-                                TimestampData.fromEpochMillis(5000L))));
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c2",
-                                3L,
-                                TimestampData.fromEpochMillis(0L),
-                                TimestampData.fromEpochMillis(10000L))));
-
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c1",
-                                0L,
-                                TimestampData.fromEpochMillis(0L),
-                                TimestampData.fromEpochMillis(10000L))));
-
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c1",
-                                1L,
-                                TimestampData.fromEpochMillis(5000L),
-                                TimestampData.fromEpochMillis(15000L))));
-        expectedOutput.add(
-                new StreamRecord<>(
-                        newRow(
-                                true,
-                                "c1",
-                                2L,
-                                TimestampData.fromEpochMillis(10000L),
-                                TimestampData.fromEpochMillis(20000L))));
-
-        expectedOutput.add(new Watermark(20000L));
-
-        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-        testHarness.close();
-    }
-
     @Override
     public LogicalType[] getOutputLogicalType() {
         return new LogicalType[] {
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 737478fd2cd..1918ea7d4be 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -94,7 +94,7 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
     }
 
     @Test
-    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+    public void testFinishBundleTriggeredOnWatermark() throws Exception {
         Configuration conf = new Configuration();
         conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
@@ -114,11 +114,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-        // checkpoint trigger finishBundle
-        testHarness.prepareSnapshotPreBarrier(0L);
-
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
@@ -165,42 +160,6 @@ public class StreamArrowPythonRowTimeBoundedRangeOperatorTest
         testHarness.close();
     }
 
-    @Test
-    public void testFinishBundleTriggeredByTime() throws Exception {
-        Configuration conf = new Configuration();
-        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
-        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
-        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
-        long initialTime = 0L;
-        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-        testHarness.open();
-
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 1L), initialTime + 1));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 1L), initialTime + 2));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
-        testHarness.processWatermark(new Watermark(10000L));
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
-        testHarness.setProcessingTime(1000L);
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 2L)));
-        expectedOutput.add(new Watermark(10000L));
-
-        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-        testHarness.close();
-    }
-
     @Test
     public void testStateCleanup() throws Exception {
         Configuration conf = new Configuration();
diff --git a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index 4123d492975..abde046397b 100644
--- a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -90,7 +90,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
     }
 
     @Test
-    public void testFinishBundleTriggeredOnCheckpoint() throws Exception {
+    public void testFinishBundleTriggeredOnWatermark() throws Exception {
         Configuration conf = new Configuration();
         conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
         OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
@@ -110,11 +110,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-        // checkpoint trigger finishBundle
-        testHarness.prepareSnapshotPreBarrier(0L);
-
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
@@ -161,42 +156,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         testHarness.close();
     }
 
-    @Test
-    public void testFinishBundleTriggeredByTime() throws Exception {
-        Configuration conf = new Configuration();
-        conf.setInteger(PythonOptions.MAX_BUNDLE_SIZE, 10);
-        conf.setLong(PythonOptions.MAX_BUNDLE_TIME_MILLS, 1000L);
-        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = getTestHarness(conf);
-
-        long initialTime = 0L;
-        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-        testHarness.open();
-
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c2", 0L, 1L), initialTime + 1));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c4", 1L, 1L), initialTime + 2));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c1", "c6", 2L, 10L), initialTime + 3));
-        testHarness.processElement(
-                new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), initialTime + 3));
-        testHarness.processWatermark(new Watermark(10000L));
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, testHarness.getOutput());
-
-        testHarness.setProcessingTime(1000L);
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 0L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 0L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 3L)));
-        expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 10L, 1L)));
-        expectedOutput.add(new Watermark(10000L));
-
-        assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-        testHarness.close();
-    }
-
     @Override
     public LogicalType[] getOutputLogicalType() {
         return new LogicalType[] {