You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/29 05:21:42 UTC

[flink-table-store] branch master updated: [FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 50b1b295 [FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset
50b1b295 is described below

commit 50b1b2951fcdfbfc139e21950086dddb3e01c690
Author: Jane Chan <55...@users.noreply.github.com>
AuthorDate: Mon Aug 29 13:21:38 2022 +0800

    [FLINK-29098] StoreWriteOperator#prepareCommit should let logSinkFunction flush pending records before fetching offset
    
    This closes #279
---
 .../flink/table/store/connector/sink/StoreWriteOperator.java       | 5 +++++
 .../org/apache/flink/table/store/table/sink/LogSinkFunction.java   | 3 +++
 .../java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java | 7 ++++++-
 3 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
index 1c14ec0b..dd332979 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreWriteOperator.java
@@ -192,6 +192,11 @@ public class StoreWriteOperator extends PrepareCommitOperator {
         }
 
         if (logCallback != null) {
+            try {
+                logSinkFunction.flush();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
             logCallback
                     .offsets()
                     .forEach(
diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
index 82d28146..b484b4d5 100644
--- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
+++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/LogSinkFunction.java
@@ -25,6 +25,9 @@ public interface LogSinkFunction extends SinkFunction<SinkRecord> {
 
     void setWriteCallback(WriteCallback writeCallback);
 
+    /** Flush pending records. */
+    void flush() throws Exception;
+
     /**
      * A callback interface that the user can implement to know the offset of the bucket when the
      * request is complete.
diff --git a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
index 587a1813..afea2367 100644
--- a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
+++ b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaSinkFunction.java
@@ -19,10 +19,10 @@
 package org.apache.flink.table.store.kafka;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
 import org.apache.flink.table.store.table.sink.LogSinkFunction;
-import org.apache.flink.table.store.table.sink.LogSinkFunction.WriteCallback;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -77,4 +77,9 @@ public class KafkaSinkFunction extends FlinkKafkaProducer<SinkRecord> implements
                     baseCallback.onCompletion(metadata, exception);
                 };
     }
+
+    @Override
+    public void flush() throws FlinkKafkaException {
+        super.preCommit(super.currentTransaction());
+    }
 }