You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2021/11/06 09:41:27 UTC

[rocketmq] 01/02: Add definition for logic queue

This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit e862ac88fc5aae370648e6495f54476bff844e52
Author: dongeforever <do...@apache.org>
AuthorDate: Sat Nov 6 11:54:21 2021 +0800

    Add definition for logic queue
---
 .../rocketmq/broker/BrokerPathConfigHelper.java    |  4 +
 .../broker/topic/TopicQueueMappingManager.java     | 86 ++++++++++++++++++++++
 .../body/TopicQueueMappingSerializeWrapper.java    | 45 +++++++++++
 .../protocol/route/LogicQueueMappingItem.java      | 54 ++++++++++++++
 .../protocol/route/TopicQueueMappingInfo.java      | 61 +++++++++++++++
 5 files changed, 250 insertions(+)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 43a9946..e7a72e0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -35,6 +35,10 @@ public class BrokerPathConfigHelper {
         return rootDir + File.separator + "config" + File.separator + "topics.json";
     }
 
+    public static String getTopicQueueMappingPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "topicqueuemapping.json";
+    }
+
     public static String getConsumerOffsetPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
new file mode 100644
index 0000000..9ee0f51
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.TopicQueueMappingSerializeWrapper;
+import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TopicQueueMappingManager extends ConfigManager {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private transient final Lock lock = new ReentrantLock();
+
+    private final DataVersion dataVersion = new DataVersion();
+    private transient BrokerController brokerController;
+
+    private final ConcurrentMap<String, TopicQueueMappingInfo> topicQueueMappingTable = new ConcurrentHashMap<>();
+
+
+    public TopicQueueMappingManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+
+    }
+
+    @Override
+    public String encode(boolean pretty) {
+        TopicQueueMappingSerializeWrapper wrapper = new TopicQueueMappingSerializeWrapper();
+        wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
+        wrapper.setDataVersion(this.dataVersion);
+        return JSON.toJSONString(wrapper, pretty);
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return BrokerPathConfigHelper.getTopicQueueMappingPath(this.brokerController.getMessageStoreConfig()
+            .getStorePathRootDir());
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TopicQueueMappingSerializeWrapper wrapper = TopicQueueMappingSerializeWrapper.fromJson(jsonString, TopicQueueMappingSerializeWrapper.class);
+            if (wrapper != null) {
+                this.topicQueueMappingTable.putAll(wrapper.getTopicQueueMappingInfoMap());
+                this.dataVersion.assignNewOne(wrapper.getDataVersion());
+            }
+        }
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
new file mode 100644
index 0000000..ef3f758
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicQueueMappingSerializeWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.body;
+
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.protocol.route.TopicQueueMappingInfo;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.Map;
+
+public class TopicQueueMappingSerializeWrapper extends RemotingSerializable {
+    private Map<String/* topic */, TopicQueueMappingInfo> topicQueueMappingInfoMap;
+    private DataVersion dataVersion = new DataVersion();
+
+    public Map<String, TopicQueueMappingInfo> getTopicQueueMappingInfoMap() {
+        return topicQueueMappingInfoMap;
+    }
+
+    public void setTopicQueueMappingInfoMap(Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap) {
+        this.topicQueueMappingInfoMap = topicQueueMappingInfoMap;
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public void setDataVersion(DataVersion dataVersion) {
+        this.dataVersion = dataVersion;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
new file mode 100644
index 0000000..fc5cbe6
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/LogicQueueMappingItem.java
@@ -0,0 +1,54 @@
+package org.apache.rocketmq.common.protocol.route;
+
+public class LogicQueueMappingItem {
+
+    private int gen; //generation, mutable
+    private int queueId;
+    private String bname;
+    private long logicOffset; // the start of the logic offset
+    private long startOffset; // the start of the physical offset
+    private long timeOfStart = -1; //mutable
+
+    public LogicQueueMappingItem(int gen, int queueId, String bname, long logicOffset, long startOffset, long timeOfStart) {
+        this.gen = gen;
+        this.queueId = queueId;
+        this.bname = bname;
+        this.logicOffset = logicOffset;
+        this.startOffset = startOffset;
+        this.timeOfStart = timeOfStart;
+    }
+
+    public int getGen() {
+        return gen;
+    }
+
+    public void setGen(int gen) {
+        this.gen = gen;
+    }
+
+
+    public long getTimeOfStart() {
+        return timeOfStart;
+    }
+
+    public void setTimeOfStart(long timeOfStart) {
+        this.timeOfStart = timeOfStart;
+    }
+
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public String getBname() {
+        return bname;
+    }
+
+    public long getLogicOffset() {
+        return logicOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
new file mode 100644
index 0000000..0376965
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicQueueMappingInfo.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.route;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TopicQueueMappingInfo {
+
+    private int totalQueues;
+    private String bname;  //identify the host name
+    //the newest mapping is in current broker
+    private Map<Integer/*global id*/, List<LogicQueueMappingItem>> hostedQueues = new HashMap<Integer, List<LogicQueueMappingItem>>();
+
+
+    public TopicQueueMappingInfo(int totalQueues, String bname) {
+        this.totalQueues = totalQueues;
+        this.bname = bname;
+    }
+
+    public boolean putMappingInfo(Integer globalId, List<LogicQueueMappingItem> mappingInfo) {
+        if (mappingInfo.isEmpty()) {
+            return true;
+        }
+        hostedQueues.put(globalId, mappingInfo);
+        return true;
+    }
+
+    public List<LogicQueueMappingItem> getMappingInfo(Integer globalId) {
+        return hostedQueues.get(globalId);
+    }
+
+    public int getTotalQueues() {
+        return totalQueues;
+    }
+
+    public void setTotalQueues(int totalQueues) {
+        this.totalQueues = totalQueues;
+    }
+
+    public String getBname() {
+        return bname;
+    }
+
+
+}