You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/08/24 11:18:24 UTC

[rocketmq-flink] 11/33: flink-rocketmq-sink , producer send message set delay level (optional… (#237)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit b5c05e0f5ed71ba1f4aa9eae55136995df724f9f
Author: xlzl <75...@qq.com>
AuthorDate: Mon Jun 3 11:05:31 2019 +0800

    flink-rocketmq-sink , producer send message set delay level (optional… (#237)
---
 .../org/apache/rocketmq/flink/RocketMQConfig.java  | 27 +++++++++++++++++++---
 .../org/apache/rocketmq/flink/RocketMQSink.java    | 20 ++++++++++++----
 .../apache/rocketmq/flink/RocketMQSinkTest.java    |  1 +
 .../flink/example/RocketMQFlinkExample.java        |  6 ++++-
 4 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
index 8ec760b..5b43b31 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
@@ -34,7 +34,7 @@ import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
  * RocketMQConfig for Consumer/Producer.
  */
 public class RocketMQConfig {
-    // common
+    // Server Config
     public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
 
     public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
@@ -44,7 +44,7 @@ public class RocketMQConfig {
     public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
 
 
-    // producer
+    // Producer related config
     public static final String PRODUCER_GROUP = "producer.group";
 
     public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
@@ -54,7 +54,7 @@ public class RocketMQConfig {
     public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
 
 
-    // consumer
+    // Consumer related config
     public static final String CONSUMER_GROUP = "consumer.group"; // Required
 
     public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
@@ -80,6 +80,27 @@ public class RocketMQConfig {
     public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
     public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
 
+    public static final String MSG_DELAY_LEVEL = "msg.delay.level";
+    public static final int MSG_DELAY_LEVEL00 = 0; // no delay
+    public static final int MSG_DELAY_LEVEL01 = 1; // 1s
+    public static final int MSG_DELAY_LEVEL02 = 2; // 5s
+    public static final int MSG_DELAY_LEVEL03 = 3; // 10s
+    public static final int MSG_DELAY_LEVEL04 = 4; // 30s
+    public static final int MSG_DELAY_LEVEL05 = 5; // 1min
+    public static final int MSG_DELAY_LEVEL06 = 6; // 2min
+    public static final int MSG_DELAY_LEVEL07 = 7; // 3min
+    public static final int MSG_DELAY_LEVEL08 = 8; // 4min
+    public static final int MSG_DELAY_LEVEL09 = 9; // 5min
+    public static final int MSG_DELAY_LEVEL10 = 10; // 6min
+    public static final int MSG_DELAY_LEVEL11 = 11; // 7min
+    public static final int MSG_DELAY_LEVEL12 = 12; // 8min
+    public static final int MSG_DELAY_LEVEL13 = 13; // 9min
+    public static final int MSG_DELAY_LEVEL14 = 14; // 10min
+    public static final int MSG_DELAY_LEVEL15 = 15; // 20min
+    public static final int MSG_DELAY_LEVEL16 = 16; // 30min
+    public static final int MSG_DELAY_LEVEL17 = 17; // 1h
+    public static final int MSG_DELAY_LEVEL18 = 18; // 2h
+
     /**
      * Build Producer Configs.
      * @param props Properties
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
index 65274af..41bbcbe 100644
--- a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -62,10 +62,22 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
     private int batchSize = 1000;
     private List<Message> batchList;
 
+    private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
+
     public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
         this.serializationSchema = schema;
         this.topicSelector = topicSelector;
         this.props = props;
+
+        if (this.props != null) {
+            this.messageDeliveryDelayLevel  = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
+                    RocketMQConfig.MSG_DELAY_LEVEL00);
+            if (this.messageDeliveryDelayLevel  < RocketMQConfig.MSG_DELAY_LEVEL00) {
+                this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL00;
+            } else if (this.messageDeliveryDelayLevel  > RocketMQConfig.MSG_DELAY_LEVEL18) {
+                this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL18;
+            }
+        }
     }
 
     @Override
@@ -105,7 +117,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         }
 
         if (async) {
-            // async sending
             try {
                 producer.send(msg, new SendCallback() {
                     @Override
@@ -124,7 +135,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
                 LOG.error("Async send message failure!", e);
             }
         } else {
-            // sync sending, will return a SendResult
             try {
                 SendResult result = producer.send(msg);
                 LOG.debug("Sync send message result: {}", result);
@@ -134,7 +144,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         }
     }
 
-    // Mapping: from storm tuple -> rocketmq Message
     private Message prepareMessage(IN input) {
         String topic = topicSelector.getTopic(input);
         String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : "";
@@ -147,6 +156,9 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
         Validate.notNull(value, "the message body is null");
 
         Message msg = new Message(topic, tag, key, value);
+        if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
+            msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
+        }
         return msg;
     }
 
@@ -191,6 +203,6 @@ public class RocketMQSink<IN> extends RichSinkFunction<IN> implements Checkpoint
 
     @Override
     public void initializeState(FunctionInitializationContext context) throws Exception {
-        // nothing to do
+        // Nothing to do
     }
 }
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
index ec844f2..74a10b0 100644
--- a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
@@ -46,6 +46,7 @@ public class RocketMQSinkTest {
         KeyValueSerializationSchema serializationSchema = new SimpleKeyValueSerializationSchema("id", "name");
         TopicSelector topicSelector = new DefaultTopicSelector("tpc");
         Properties props = new Properties();
+        props.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(RocketMQConfig.MSG_DELAY_LEVEL04));
         rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, props);
 
         producer = mock(DefaultMQProducer.class);
diff --git a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
index b2a4034..f4f654e 100644
--- a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
+++ b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
@@ -46,6 +46,10 @@ public class RocketMQFlinkExample {
 
         Properties producerProps = new Properties();
         producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
+        int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL05;
+        producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL,String.valueOf(msgDelayLevel));
+        // TimeDelayLevel is not supported for batching
+        boolean batchFlag = msgDelayLevel <= 0;
 
         env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
             .name("rocketmq-source")
@@ -63,7 +67,7 @@ public class RocketMQFlinkExample {
             .name("upper-processor")
             .setParallelism(2)
             .addSink(new RocketMQSink(new SimpleKeyValueSerializationSchema("id", "province"),
-                new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(true))
+                new DefaultTopicSelector("flink-sink2"), producerProps).withBatchFlushOnCheckpoint(batchFlag))
             .name("rocketmq-sink")
             .setParallelism(2);