You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:19 UTC
[rocketmq-flink] 06/33: Add batch size param for flink sink (#198)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit 452b57fcd480dd707ecd8b73d898e5e87c8f6679
Author: shangan <ch...@163.com>
AuthorDate: Fri Feb 1 14:37:05 2019 +0800
Add batch size param for flink sink (#198)
* Add batch size param for flink sink
* Improvement. code style correction
---
src/main/java/org/apache/rocketmq/flink/RocketMQSink.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index e79d1b4..65274af 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -59,6 +59,7 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
private KeyValueSerializationSchema<IN> serializationSchema;
private boolean batchFlushOnCheckpoint; // false by default
+ private int batchSize = 1000;
private List<Message> batchList;
public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
@@ -97,6 +98,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
if (batchFlushOnCheckpoint) {
batchList.add(msg);
+ if (batchList.size() >= batchSize) {
+ flushSync();
+ }
return;
}
@@ -156,6 +160,11 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
return this;
}
+ public RocketMQSink<IN> withBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ return this;
+ }
+
@Override
public void close() throws Exception {
if (producer != null) {