You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 06:46:59 UTC

[rocketmq-schema-registry] 12/34: init compact topic

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit a0d18f7499d32686d580731ef369a3f47ff02b1e
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Sun Jul 24 14:49:52 2022 +0800

    init compact topic
---
 schema-storage-rocketmq/pom.xml                    |  4 ++--
 .../registry/storage/rocketmq/RocketmqClient.java  | 22 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)

diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml
index a3f4b17..0b23abe 100644
--- a/schema-storage-rocketmq/pom.xml
+++ b/schema-storage-rocketmq/pom.xml
@@ -38,13 +38,13 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
-            <version>4.9.3</version>
+            <version>5.0.0-ALPHA</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-tools</artifactId>
-            <version>4.9.3</version>
+            <version>5.0.0-ALPHA</version>
         </dependency>
     </dependencies>
 
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 87cde86..9c0bc7a 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.schema.registry.common.QualifiedName;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
 import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
@@ -111,6 +112,22 @@ public class RocketmqClient {
         try {
             mqAdminExt.start();
 
+            // check if the topic exists
+            TopicRouteData topicRouteData = null;
+            try {
+                topicRouteData = mqAdminExt.examineTopicRouteInfo(storageTopic);
+            } catch (MQClientException e) {
+                log.warn("maybe the storage topic not found, need to create");
+            } catch (Exception e) {
+                throw new SchemaException("Failed to create storage rocketmq topic", e);
+            }
+
+            if (topicRouteData != null && CollectionUtils.isNotEmpty(topicRouteData.getBrokerDatas())
+                && CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas())) {
+                log.info("the storage topic already exist, no need to create");
+                return;
+            }
+
             try {
                 ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
                 HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
@@ -119,7 +136,10 @@ public class RocketmqClient {
                     topicConfig.setTopicName(storageTopic);
                     topicConfig.setReadQueueNums(8);
                     topicConfig.setWriteQueueNums(8);
-                    // TODO compact topic (TopicAttributes)
+                    // create compact topic
+                    Map<String, String> attributes = new HashMap<>(1);
+                    attributes.put("+delete.policy", "COMPACTION");
+                    topicConfig.setAttributes(attributes);
                     String brokerAddr = brokerData.selectBrokerAddr();
                     mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
                 }