You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/07/25 08:58:01 UTC
[flink] 01/05: [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac6007116a199bcade7cb258552fca7d9efa1d01
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue Sep 7 20:02:42 2021 +0200
[FLINK-23528][datastream] Let CollectSinkOperator publish results in #close.
DataStream#executeAndCollect expects the CollectSinkOperator to register the accumulator at the end of the application or fails with some exception.
However, a stop-with-savepoint without drain would not trigger CollectSinkOperator#finish and thus skip the registration.
---
.../flink/streaming/api/operators/collect/CollectSinkOperator.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
index 8b78bc30fc9..5c84c9b1378 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java
@@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> implements OperatorE
}
@Override
- public void finish() throws Exception {
+ public void close() throws Exception {
sinkFunction.accumulateFinalResults();
- super.finish();
+ super.close();
}
public CompletableFuture<OperatorID> getOperatorIdFuture() {