You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 06:46:59 UTC
[rocketmq-schema-registry] 12/34: init compact topic
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit a0d18f7499d32686d580731ef369a3f47ff02b1e
Author: huitong <yi...@alibaba-inc.com>
AuthorDate: Sun Jul 24 14:49:52 2022 +0800
init compact topic
---
schema-storage-rocketmq/pom.xml | 4 ++--
.../registry/storage/rocketmq/RocketmqClient.java | 22 +++++++++++++++++++++-
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git a/schema-storage-rocketmq/pom.xml b/schema-storage-rocketmq/pom.xml
index a3f4b17..0b23abe 100644
--- a/schema-storage-rocketmq/pom.xml
+++ b/schema-storage-rocketmq/pom.xml
@@ -38,13 +38,13 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
- <version>4.9.3</version>
+ <version>5.0.0-ALPHA</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
- <version>4.9.3</version>
+ <version>5.0.0-ALPHA</version>
</dependency>
</dependencies>
diff --git a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
index 87cde86..9c0bc7a 100644
--- a/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
+++ b/schema-storage-rocketmq/src/main/java/org/apache/rocketmq/schema/registry/storage/rocketmq/RocketmqClient.java
@@ -43,6 +43,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.schema.registry.common.QualifiedName;
import org.apache.rocketmq.schema.registry.common.exception.SchemaException;
import org.apache.rocketmq.schema.registry.common.exception.SchemaExistException;
@@ -111,6 +112,22 @@ public class RocketmqClient {
try {
mqAdminExt.start();
+ // check if the topic exists
+ TopicRouteData topicRouteData = null;
+ try {
+ topicRouteData = mqAdminExt.examineTopicRouteInfo(storageTopic);
+ } catch (MQClientException e) {
+ log.warn("maybe the storage topic not found, need to create");
+ } catch (Exception e) {
+ throw new SchemaException("Failed to create storage rocketmq topic", e);
+ }
+
+ if (topicRouteData != null && CollectionUtils.isNotEmpty(topicRouteData.getBrokerDatas())
+ && CollectionUtils.isNotEmpty(topicRouteData.getQueueDatas())) {
+ log.info("the storage topic already exist, no need to create");
+ return;
+ }
+
try {
ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = clusterInfo.getBrokerAddrTable();
@@ -119,7 +136,10 @@ public class RocketmqClient {
topicConfig.setTopicName(storageTopic);
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
- // TODO compact topic (TopicAttributes)
+ // create compact topic
+ Map<String, String> attributes = new HashMap<>(1);
+ attributes.put("+delete.policy", "COMPACTION");
+ topicConfig.setAttributes(attributes);
String brokerAddr = brokerData.selectBrokerAddr();
mqAdminExt.createAndUpdateTopicConfig(brokerAddr, topicConfig);
}