You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2023/01/16 02:30:14 UTC

[rocketmq-streams] branch 49x/develop created (now a809f17a)

This is an automated email from the ASF dual-hosted git repository.

karp pushed a change to branch 49x/develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git


      at a809f17a create normal topic for state storage.

This branch includes the following new commits:

     new c2c81fa9 change rocketmq version to 4.9.4
     new a809f17a create normal topic for state storage.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[rocketmq-streams] 01/02: change rocketmq version to 4.9.4

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch 49x/develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit c2c81fa9f1ccec4eb359ed78dbce06e5ba33e166
Author: 维章 <un...@gmail.com>
AuthorDate: Sat Jan 14 22:10:54 2023 +0800

    change rocketmq version to 4.9.4
---
 core/pom.xml                                       |  2 +-
 .../streams/core/running/WorkerThread.java         |  2 +-
 .../rocketmq/streams/core/state/RocketMQStore.java |  3 +-
 .../rocketmq/streams/core/util/RocketMQUtil.java   | 32 +++++++++++++++++++++-
 example/pom.xml                                    |  2 +-
 pom.xml                                            |  4 +--
 6 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/core/pom.xml b/core/pom.xml
index 798b75e6..1efbb6c7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
     <parent>
         <artifactId>rocketmq-streams-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>1.1.1-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
index 32a2b050..19a30b65 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java
@@ -232,7 +232,7 @@ public class WorkerThread extends Thread {
             }
 
             for (String topicName : shuffleTopic) {
-                RocketMQUtil.createStaticTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
+                RocketMQUtil.createNormalTopic(mqAdmin, topicName, StreamConfig.SHUFFLE_TOPIC_QUEUE_NUM);
             }
         }
 
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 44e04c52..531321eb 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -388,6 +388,7 @@ public class RocketMQStore extends AbstractStore implements StateStore {
         return target;
     }
 
+    //static topic queue num changes with source topic.
     private void createStateTopic(String stateTopic) throws Exception {
         if (RocketMQUtil.checkWhetherExist(stateTopic)) {
             return;
@@ -396,7 +397,7 @@ public class RocketMQStore extends AbstractStore implements StateStore {
         String sourceTopic = stateTopic2SourceTopic(stateTopic);
         Pair<Integer, Set<String>> clustersPair = getTotalQueueNumAndClusters(sourceTopic);
 
-        RocketMQUtil.createStaticCompactTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue());
+        RocketMQUtil.createNormalTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue());
     }
 
     private Pair<Integer, Set<String>> getTotalQueueNumAndClusters(String sourceTopic) throws Exception {
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
index 34b787d5..2251c8e1 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
@@ -21,11 +21,13 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
 import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 import org.slf4j.Logger;
@@ -41,6 +43,34 @@ public class RocketMQUtil {
 
     private static final List<String> existTopic = new ArrayList<>();
 
+    //neither static topic nor compact topic.
+    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception {
+        if (check(mqAdmin, topicName)) {
+            logger.info("topic[{}] already exist.", topicName);
+            return;
+        }
+
+        if (clusters == null || clusters.size() == 0) {
+            clusters = getCluster(mqAdmin);
+        }
+
+        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum);
+
+        for (String cluster : clusters) {
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
+
+            for (String addr : masterSet) {
+                mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
+                logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster);
+            }
+        }
+    }
+
+    public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum) throws Exception {
+        Set<String> clusters = getCluster(mqAdmin);
+        createNormalTopic(mqAdmin, topicName, queueNum, clusters);
+    }
+
     public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception {
         if (check(mqAdmin, topicName)) {
             logger.info("topic[{}] already exist.", topicName);
@@ -129,7 +159,7 @@ public class RocketMQUtil {
     }
 
 
-    public static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws Exception {
+    private static Set<String> getCluster(DefaultMQAdminExt mqAdmin) throws Exception {
         ClusterInfo clusterInfo = mqAdmin.examineBrokerClusterInfo();
         return clusterInfo.getClusterAddrTable().keySet();
     }
diff --git a/example/pom.xml b/example/pom.xml
index c3731b1b..68da17b4 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -3,7 +3,7 @@
     <parent>
         <artifactId>rocketmq-streams-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>1.1.1-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
diff --git a/pom.xml b/pom.xml
index 265402b9..16c2e5f3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
 
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-streams-all</artifactId>
-    <version>1.1.1-SNAPSHOT</version>
+    <version>1.1.0-SNAPSHOT</version>
     <name>Apache RocketMQ Streams ${project.version}</name>
     <packaging>pom</packaging>
     <url>https://rocketmq.apache.org/</url>
@@ -86,7 +86,7 @@
     </modules>
 
     <properties>
-        <rocketmq.version>5.0.0</rocketmq.version>
+        <rocketmq.version>4.9.4</rocketmq.version>
         <rocksdbjni.version>7.6.0</rocksdbjni.version>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>


[rocketmq-streams] 02/02: create normal topic for state storage.

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch 49x/develop
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit a809f17a7bdd67324977d8321414ca7da2ebd925
Author: 维章 <un...@gmail.com>
AuthorDate: Mon Jan 16 10:29:24 2023 +0800

    create normal topic for state storage.
---
 .../core/function/supplier/SinkSupplier.java       | 10 ++-
 .../rocketmq/streams/core/state/RocketMQStore.java | 28 ++++++++-
 .../rocketmq/streams/core/state/StateStore.java    |  1 +
 .../rocketmq/streams/core/util/RocketMQUtil.java   | 71 +---------------------
 4 files changed, 35 insertions(+), 75 deletions(-)

diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
index 393e8956..09d56467 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/function/supplier/SinkSupplier.java
@@ -61,7 +61,7 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
             this.key = context.getKey();
         }
 
-        //sink into shuffle topic/state topic/user topic
+        //sink into shuffle/user topic
         @Override
         public void process(T data) throws Throwable {
             if (data != null) {
@@ -83,8 +83,9 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
                     producer.send(message);
                 } else {
                     message = new Message(this.topicName, value);
+                    String hexKey = Utils.toHexString(this.key);
                     //the real key is in the body, this key is used to route the same key into the same queue.
-                    message.setKeys(Utils.toHexString(this.key));
+                    message.setKeys(hexKey);
 
 
                     message.putUserProperty(Constant.SHUFFLE_KEY_CLASS_NAME, this.key.getClass().getName());
@@ -94,7 +95,10 @@ public class SinkSupplier<K, T> implements Supplier<Processor<T>> {
                         message.putUserProperty(Constant.SOURCE_TIMESTAMP, String.valueOf(this.context.getDataTime()));
                     }
 
-                    producer.send(message, new SelectMessageQueueByHash(), this.key);
+                    //For data write back, Write-prohibited is forbidden, because it will make send message failed.
+                    //And if the MessageQueue num changed(like expansion), the data with same key will be sent into different MessageQueue.
+                    //shuffle topic must be Static topic, to solve the problem in expansion.
+                    producer.send(message, new SelectMessageQueueByHash(), hexKey);
                 }
             }
         }
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
index 531321eb..68e39dbe 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.streams.core.state;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.impl.consumer.AssignedMessageQueue;
+import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.MixAll;
@@ -41,6 +43,7 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
@@ -176,6 +179,8 @@ public class RocketMQStore extends AbstractStore implements StateStore {
 
         Set<MessageQueue> stateTopicQueues = convertSourceTopicQueue2StateTopicQueue(messageQueues);
         for (MessageQueue stateTopicQueue : stateTopicQueues) {
+            //if the source queue is removed, skip it.
+
             String stateTopicQueueKey = buildKey(stateTopicQueue);
             Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey);
 
@@ -264,6 +269,7 @@ public class RocketMQStore extends AbstractStore implements StateStore {
                 for (String stateUniqueQueue : groupByUniqueQueue.keySet()) {
                     Set<byte[]> stateTopicQueueKey = super.getAll(stateUniqueQueue);
                     for (byte[] key : stateTopicQueueKey) {
+                        logger.info("remove state queue:{}, delete corresponding state from rocksdb", stateUniqueQueue);
                         this.rocksDBStore.deleteByKey(key);
                     }
                     super.removeAll(stateUniqueQueue);
@@ -271,6 +277,7 @@ public class RocketMQStore extends AbstractStore implements StateStore {
 
 
                 for (MessageQueue stateMessageQueue : stateTopicQueue) {
+                    logger.info("remove state queue:{}, remove corresponding recover lock.",stateMessageQueue);
                     this.recoveringQueueMutex.remove(stateMessageQueue);
                 }
             } catch (Throwable e) {
@@ -286,7 +293,7 @@ public class RocketMQStore extends AbstractStore implements StateStore {
     }
 
     private void pullToLast(DefaultLitePullConsumer consumer) throws Throwable {
-        Set<MessageQueue> readyToRecover = consumer.assignment();
+        Set<MessageQueue> readyToRecover = assignMessageQueue(consumer);//consumer.assignment();
         for (MessageQueue messageQueue : readyToRecover) {
             this.recoveringQueueMutex.computeIfAbsent(messageQueue, messageQueue1 -> new CountDownLatch2(1));
         }
@@ -311,13 +318,28 @@ public class RocketMQStore extends AbstractStore implements StateStore {
         }
 
         //恢复完毕;
-        Set<MessageQueue> recoverOver = consumer.assignment();
-        for (MessageQueue messageQueue : recoverOver) {
+        for (MessageQueue messageQueue : readyToRecover) {
             CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(messageQueue);
             waitPoint.countDown();
         }
     }
 
+    private Set<MessageQueue> assignMessageQueue(DefaultLitePullConsumer consumer) throws Throwable {
+        Class<? extends DefaultLitePullConsumer> consumerClass = consumer.getClass();
+
+        Field consumerImpl = consumerClass.getDeclaredField("defaultLitePullConsumerImpl");
+        consumerImpl.setAccessible(true);
+
+        DefaultLitePullConsumerImpl defaultLitePullConsumer = (DefaultLitePullConsumerImpl)consumerImpl.get(consumer);
+
+        Field assignedMessageQueueField = defaultLitePullConsumer.getClass().getDeclaredField("assignedMessageQueue");
+        assignedMessageQueueField.setAccessible(true);
+
+        AssignedMessageQueue assignedMessageQueue = (AssignedMessageQueue) assignedMessageQueueField.get(defaultLitePullConsumer);
+
+        return assignedMessageQueue.getAssignedMessageQueues();
+    }
+
     //拉的数据越多,重放效率越高,
     // 能保证一个q里面后面pull到的数据queueOffset一定比前一批次拉取的queueOffset大吗?
     private void replayState(List<MessageExt> msgs) throws Throwable {
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
index 7f75361c..5ec4422d 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/state/StateStore.java
@@ -48,5 +48,6 @@ public interface StateStore extends AutoCloseable {
 
     void delete(byte[] key) throws Throwable;
 
+    //persist state into rocketmq
     void persist(Set<MessageQueue> messageQueue) throws Throwable;
 }
diff --git a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
index 2251c8e1..36455719 100644
--- a/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
+++ b/core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java
@@ -22,19 +22,18 @@ import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
-import org.apache.rocketmq.tools.command.topic.UpdateStaticTopicSubCommand;
 import org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -54,7 +53,7 @@ public class RocketMQUtil {
             clusters = getCluster(mqAdmin);
         }
 
-        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum);
+        TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum, PermName.PERM_READ | PermName.PERM_WRITE);
 
         for (String cluster : clusters) {
             Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
@@ -71,72 +70,6 @@ public class RocketMQUtil {
         createNormalTopic(mqAdmin, topicName, queueNum, clusters);
     }
 
-    public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception {
-        if (check(mqAdmin, topicName)) {
-            logger.info("topic[{}] already exist.", topicName);
-            return;
-        }
-
-        if (clusters == null || clusters.size() == 0) {
-            clusters = getCluster(mqAdmin);
-        }
-
-
-        for (String cluster : clusters) {
-            createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr());
-            logger.info("【step 1】create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, queueNum);
-
-            update2CompactTopicWithCommand(topicName, queueNum, cluster, mqAdmin.getNamesrvAddr());
-            logger.info("【step 2】update static topic to compact topic success. topic:[{}], cluster:[{}]", topicName, cluster);
-        }
-
-        existTopic.add(topicName);
-        logger.info("create static-compact topic [{}] success, queue num [{}]", topicName, queueNum);
-    }
-
-    public static void createStaticTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum) throws Exception {
-        if (check(mqAdmin, topicName)) {
-            logger.info("topic[{}] already exist.", topicName);
-            return;
-        }
-
-        Set<String> clusters = getCluster(mqAdmin);
-        for (String cluster : clusters) {
-            createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr());
-            logger.info("create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, queueNum);
-        }
-
-        existTopic.add(topicName);
-    }
-
-    private static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
-        UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
-        Options options = ServerUtil.buildCommandlineOptions(new Options());
-        String[] args;
-        if (cluster != null) {
-            args = new String[]{
-                    "-c", cluster,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
-            };
-        } else {
-            String brokerStr = String.join(",", brokers);
-            args = new String[]{
-                    "-b", brokerStr,
-                    "-t", topic,
-                    "-qn", String.valueOf(queueNum),
-                    "-n", nameservers
-            };
-        }
-
-        final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), args, cmd.buildCommandlineOptions(options), new PosixParser());
-
-        String namesrvAddr = commandLine.getOptionValue('n');
-        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
-
-        cmd.execute(commandLine, options, null);
-    }
 
     private static void update2CompactTopicWithCommand(String topic, int queueNum, String cluster, String nameservers) throws Exception {
         UpdateTopicSubCommand command = new UpdateTopicSubCommand();