You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:19 UTC

[rocketmq-flink] 06/33: Add batch size param for flink sink (#198)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 452b57fcd480dd707ecd8b73d898e5e87c8f6679
Author: shangan <ch...@163.com>
AuthorDate: Fri Feb 1 14:37:05 2019 +0800

    Add batch size param for flink sink (#198)
    
    * Add batch size param for flink sink
    
    * Improvement. code style correction
---
 src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index e79d1b4..65274af 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -59,6 +59,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
     private KeyValueSerializationSchema<IN> serializationSchema;
 
     private boolean batchFlushOnCheckpoint; // false by default
+    private int batchSize = 1000;
     private List<Message> batchList;
 
     public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
@@ -97,6 +98,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
 
         if (batchFlushOnCheckpoint) {
             batchList.add(msg);
+            if (batchList.size() >= batchSize) {
+                flushSync();
+            }
             return;
         }
 
@@ -156,6 +160,11 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         return this;
     }
 
+    public RocketMQSink<IN> withBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
     @Override
     public void close() throws Exception {
         if (producer != null) {