You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/29 05:21:42 UTC
[flink-table-store] branch master updated: [FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 50b1b295 [FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset
50b1b295 is described below
commit 50b1b2951fcdfbfc139e21950086dddb3e01c690
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Aug 29 13:21:38 2022 +0800
[FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset
This closes #279
---
.../flink/table/store/connector/sink/StoreWriteOperator.java | 5 +++++
.../org/apache/flink/table/store/table/sink/LogSinkFunction.java | 3 +++
.../java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java | 7 ++++++-
3 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 1c14ec0b..dd332979 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -192,6 +192,11 @@ public class StoreWriteOperator extends PrepareCommitOperator {
}
if (logCallback != null) {
+ try {
+ logSinkFunction.flush();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
logCallback
.offsets()
.forEach(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
index 82d28146..b484b4d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
@@ -25,6 +25,9 @@ public interface LogSinkFunction extends SinkFunction<SinkRecord> {
void setWriteCallback(WriteCallback writeCallback);
+ /** Flush pending records. */
+ void flush() throws Exception;
+
/**
* A callback interface that the user can implement to know the offset of the bucket when the
* request is complete.
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
index 587a1813..afea2367 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
@@ -19,10 +19,10 @@
package org.apache.flink.table.store.kafka;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
-import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.kafka.clients.producer.Callback;
@@ -77,4 +77,9 @@ public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements
baseCallback.onCompletion(metadata, exception);
};
}
+
+ @Override
+ public void flush() throws FlinkKafkaException {
+ super.preCommit(super.currentTransaction());
+ }
}