You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:24 UTC
[rocketmq-flink] 11/33: flink-rocketmq-sink , producer send message set delay level (optional… (#237)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit b5c05e0f5ed71ba1f4aa9eae55136995df724f9f
Author: xlzl <75...@qq.com>
AuthorDate: Mon Jun 3 11:05:31 2019 +0800
flink-rocketmq-sink , producer send message set delay level (optional… (#237)
---
.../org/apache/rocketmq/flink/RocketMQConfig.java | 27 +++++++++++++++++++---
.../org/apache/rocketmq/flink/RocketMQSink.java | 20 ++++++++++++----
.../apache/rocketmq/flink/RocketMQSinkTest.java | 1 +
.../flink/example/RocketMQFlinkExample.java | 6 ++++-
4 files changed, 46 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
index 8ec760b..5b43b31 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
@@ -34,7 +34,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
* RocketMQConfig for Consumer/Producer.
*/
public class RocketMQConfig {
- // common
+ // Server Config
public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
@@ -44,7 +44,7 @@ public class RocketMQConfig {
public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
- // producer
+ // Producer related config
public static final String PRODUCER_GROUP = "producer.group";
public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
@@ -54,7 +54,7 @@ public class RocketMQConfig {
public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
- // consumer
+ // Consumer related config
public static final String CONSUMER_GROUP = "consumer.group"; // Required
public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
@@ -80,6 +80,27 @@ public class RocketMQConfig {
public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
+ public static final String MSG_DELAY_LEVEL = "msg.delay.level";
+ public static final int MSG_DELAY_LEVEL00 = 0; // no delay
+ public static final int MSG_DELAY_LEVEL01 = 1; // 1s
+ public static final int MSG_DELAY_LEVEL02 = 2; // 5s
+ public static final int MSG_DELAY_LEVEL03 = 3; // 10s
+ public static final int MSG_DELAY_LEVEL04 = 4; // 30s
+ public static final int MSG_DELAY_LEVEL05 = 5; // 1min
+ public static final int MSG_DELAY_LEVEL06 = 6; // 2min
+ public static final int MSG_DELAY_LEVEL07 = 7; // 3min
+ public static final int MSG_DELAY_LEVEL08 = 8; // 4min
+ public static final int MSG_DELAY_LEVEL09 = 9; // 5min
+ public static final int MSG_DELAY_LEVEL10 = 10; // 6min
+ public static final int MSG_DELAY_LEVEL11 = 11; // 7min
+ public static final int MSG_DELAY_LEVEL12 = 12; // 8min
+ public static final int MSG_DELAY_LEVEL13 = 13; // 9min
+ public static final int MSG_DELAY_LEVEL14 = 14; // 10min
+ public static final int MSG_DELAY_LEVEL15 = 15; // 20min
+ public static final int MSG_DELAY_LEVEL16 = 16; // 30min
+ public static final int MSG_DELAY_LEVEL17 = 17; // 1h
+ public static final int MSG_DELAY_LEVEL18 = 18; // 2h
+
/**
* Build Producer Configs.
* @param props Properties
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index 65274af..41bbcbe 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -62,10 +62,22 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
private int batchSize = 1000;
private List<Message> batchList;
+ private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
+
public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
this.serializationSchema = schema;
this.topicSelector = topicSelector;
this.props = props;
+
+ if (this.props != null) {
+ this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
+ RocketMQConfig.MSG_DELAY_LEVEL00);
+ if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) {
+ this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
+ } else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) {
+ this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18;
+ }
+ }
}
@Override
@@ -105,7 +117,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
}
if (async) {
- // async sending
try {
producer.send(msg, new SendCallback() {
@Override
@@ -124,7 +135,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
LOG.error("Async send message failure!", e);
}
} else {
- // sync sending, will return a SendResult
try {
SendResult result = producer.send(msg);
LOG.debug("Sync send message result: {}", result);
@@ -134,7 +144,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
}
}
- // Mapping: from storm tuple -> rocketmq Message
private Message prepareMessage(IN input) {
String topic = topicSelector.getTopic(input);
String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : "";
@@ -147,6 +156,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
Validate.notNull(value, "the message body is null");
Message msg = new Message(topic, tag, key, value);
+ if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
+ msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
+ }
return msg;
}
@@ -191,6 +203,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- // nothing to do
+ // Nothing to do
}
}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
index ec844f2..74a10b0 100644
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
@@ -46,6 +46,7 @@ public class RocketMQSinkTest {
KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
TopicSelector topicSelector = new DefaultTopicSelector("tpc");
Properties props = new Properties();
+ props.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, props);
producer = mock(DefaultMQProducer.class);
diff --git a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
index b2a4034..f4f654e 100644
--- a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
+++ b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
@@ -46,6 +46,10 @@ public class RocketMQFlinkExample {
Properties producerProps = new Properties();
producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
+ int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL05;
+ producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel));
+ // TimeDelayLevel is not supported for batching
+ boolean batchFlag = msgDelayLevel <= 0;
env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
.name("rocketmq-source")
@@ -63,7 +67,7 @@ public class RocketMQFlinkExample {
.name("upper-processor")
.setParallelism(2)
.addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"),
- new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true))
+ new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(batchFlag))
.name("rocketmq-sink")
.setParallelism(2);