You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/29 15:10:59 UTC

[rocketmq] branch snode updated: Add snode route info obtain logic

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 256912a  Add snode route info obtain logic
256912a is described below

commit 256912a59756a05a753ce058c769e0dc2583a97b
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 29 23:09:29 2019 +0800

    Add snode route info obtain logic
---
 .../rocketmq/common/protocol/RequestCode.java      |  6 +++
 .../rocketmq/common/protocol/body/ClusterInfo.java | 20 +++++++++
 .../common/protocol/body/SnodeClusterInfo.java     | 43 +++++++++++++++++++
 .../header/namesrv/GetSnodeInfoHeader.java         | 37 +++++++++++++++++
 .../namesrv/processor/DefaultRequestProcessor.java | 35 ++++++++++++++--
 .../namesrv/routeinfo/RouteInfoManager.java        | 28 +++++++++++--
 .../org/apache/rocketmq/snode/SnodeStartup.java    |  4 +-
 .../apache/rocketmq/snode/config/SnodeConfig.java  | 48 +++++++++++++++++++---
 .../snode/service/impl/NnodeServiceImpl.java       |  6 ++-
 9 files changed, 211 insertions(+), 16 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index f9b097f..61a8e40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -174,4 +174,10 @@ public class RequestCode {
 
     public static final int SNODE_PUSH_MESSAGE = 352;
 
+    public static final int GET_SNODE_CLUSTER_INFO = 353;
+
+    public static final int GET_SNODE_INFO = 354;
+
+
+
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 9c4d913..5a896dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -24,11 +24,14 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
 import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
 
 public class ClusterInfo extends RemotingSerializable {
     private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
     private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+    private HashMap<String/* snodeName*/, SnodeData> snodeTable;
+    private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
 
     public HashMap<String, BrokerData> getBrokerAddrTable() {
         return brokerAddrTable;
@@ -61,6 +64,23 @@ public class ClusterInfo extends RemotingSerializable {
         return addrs.toArray(new String[] {});
     }
 
+    public HashMap<String, SnodeData> getSnodeTable() {
+        return snodeTable;
+    }
+
+    public void setSnodeTable(
+        HashMap<String, SnodeData> snodeTable) {
+        this.snodeTable = snodeTable;
+    }
+
+    public HashMap<String, Set<String>> getSnodeCluster() {
+        return snodeCluster;
+    }
+
+    public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
+        this.snodeCluster = snodeCluster;
+    }
+
     public String[] retrieveAllMasterAddrByCluster(String cluster) {
         List<String> addrs = new ArrayList<String>();
         if (clusterAddrTable.containsKey(cluster)) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java
new file mode 100644
index 0000000..2d42bdf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.common.protocol.body;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Set;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+
+public class SnodeClusterInfo extends RemotingSerializable {
+    private HashMap<String/* snodeName*/, SnodeData> snodeTable;
+    private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
+
+    public HashMap<String, SnodeData> getSnodeTable() {
+        return snodeTable;
+    }
+
+    public void setSnodeTable(
+        HashMap<String, SnodeData> snodeTable) {
+        this.snodeTable = snodeTable;
+    }
+
+    public HashMap<String, Set<String>> getSnodeCluster() {
+        return snodeCluster;
+    }
+
+    public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
+        this.snodeCluster = snodeCluster;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java
new file mode 100644
index 0000000..1d094e3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetSnodeInfoHeader implements CommandCustomHeader {
+    private String snodeClusterName;
+
+    @Override
+    public void checkFields() throws RemotingCommandException {
+
+    }
+
+    public String getSnodeClusterName() {
+        return snodeClusterName;
+    }
+
+    public void setSnodeClusterName(String snodeClusterName) {
+        this.snodeClusterName = snodeClusterName;
+    }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index e3136fa..73e98ea 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -27,9 +27,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.RequestCode;
@@ -43,20 +40,24 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHead
 import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetSnodeInfoHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.namesrv.NamesrvController;
 import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
@@ -128,12 +129,38 @@ public class DefaultRequestProcessor implements RequestProcessor {
                 return this.getConfig(ctx, request);
             case RequestCode.REGISTER_SNODE:
                 return this.registerSnode(ctx, request);
+            case RequestCode.GET_SNODE_CLUSTER_INFO:
+                return this.getSnodeClusterInfo(ctx, request);
+            case RequestCode.GET_SNODE_INFO:
+                return getSnodeInfo(ctx, request);
             default:
                 break;
         }
         return null;
     }
 
+    public RemotingCommand getSnodeClusterInfo(ChannelHandlerContext ctx,
+        RemotingCommand request) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        byte[] content = this.namesrvController.getRouteInfoManager().getAllSnodeData();
+        response.setBody(content);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
+    public RemotingCommand getSnodeInfo(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        final GetSnodeInfoHeader requestHeader =
+            (GetSnodeInfoHeader) request.decodeCommandCustomHeader(GetSnodeInfoHeader.class);
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        byte[] content = this.namesrvController.getRouteInfoManager().getSnodeDatabyClusterName(requestHeader.getSnodeClusterName());
+        response.setBody(content);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
+    }
+
     public RemotingCommand registerSnode(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 58edcab..c3342dc 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -33,17 +33,18 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.route.SnodeData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 public class RouteInfoManager {
@@ -72,9 +73,30 @@ public class RouteInfoManager {
         ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
         clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
         clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
+        clusterInfoSerializeWrapper.setSnodeCluster(this.snodeCluster);
+        clusterInfoSerializeWrapper.setSnodeTable(this.snodeTable);
         return clusterInfoSerializeWrapper.encode();
     }
 
+    public byte[] getSnodeDatabyClusterName(String clusterName) {
+        SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
+        Set<String> snodeNameSet = this.snodeCluster.get(clusterName);
+        HashMap<String, SnodeData> snodeDatas = new HashMap<>();
+        for (String snodeName : snodeNameSet) {
+            SnodeData snodeData = this.snodeTable.get(snodeName);
+            snodeDatas.putIfAbsent(clusterName, snodeData);
+        }
+        snodeClusterInfo.setSnodeTable(snodeDatas);
+        return snodeClusterInfo.encode();
+    }
+
+    public byte[] getAllSnodeData() {
+        SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
+        snodeClusterInfo.setSnodeCluster(this.snodeCluster);
+        snodeClusterInfo.setSnodeTable(snodeTable);
+        return snodeClusterInfo.encode();
+    }
+
     public void deleteTopic(final String topic) {
         try {
             try {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 516a805..0e753e7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.ClientConfig;
 import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.config.SnodeConfig;
@@ -58,7 +58,7 @@ public class SnodeStartup {
             controller.start();
 
             String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
-                + controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+                + controller.getSnodeConfig().getSnodeIP1() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
             if (null != controller.getSnodeConfig().getNamesrvAddr()) {
                 tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 0d41501..e37dd7c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -16,11 +16,14 @@
  */
 package org.apache.rocketmq.snode.config;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
 
@@ -33,9 +36,16 @@ public class SnodeConfig {
     @ImportantField
     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
-    private String snodeName = "defaultNode";
+    @ImportantField
+    private String snodeIP1 = RemotingUtil.getLocalAddress();
+
+    private String snodeIP2 = RemotingUtil.getLocalAddress();
+
+    @ImportantField
+    private String snodeName = localHostName();
 
-    private String snodeAddr = "127.0.0.1:11911";
+    @ImportantField
+    private long snodeId = MixAll.MASTER_ID;
 
     private String clusterName = "defaultCluster";
 
@@ -135,6 +145,16 @@ public class SnodeConfig {
     @ImportantField
     private boolean fetchNamesrvAddrByAddressServer = false;
 
+    public static String localHostName() {
+        try {
+            return InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            log.error("Failed to obtain the host name", e);
+        }
+
+        return "DEFAULT_SNODE";
+    }
+
     public boolean isFetchNamesrvAddrByAddressServer() {
         return fetchNamesrvAddrByAddressServer;
     }
@@ -207,12 +227,28 @@ public class SnodeConfig {
         this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
     }
 
-    public String getSnodeAddr() {
-        return snodeAddr;
+    public String getSnodeIP1() {
+        return snodeIP1;
+    }
+
+    public void setSnodeIP1(String snodeIP1) {
+        this.snodeIP1 = snodeIP1;
+    }
+
+    public String getSnodeIP2() {
+        return snodeIP2;
+    }
+
+    public void setSnodeIP2(String snodeIP2) {
+        this.snodeIP2 = snodeIP2;
+    }
+
+    public long getSnodeId() {
+        return snodeId;
     }
 
-    public void setSnodeAddr(String snodeAddr) {
-        this.snodeAddr = snodeAddr;
+    public void setSnodeId(long snodeId) {
+        this.snodeId = snodeId;
     }
 
     public String getSnodeName() {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index 8853b1f..d30543e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -53,12 +53,16 @@ public class NnodeServiceImpl implements NnodeService {
         this.snodeController = snodeController;
     }
 
+    public String getSnodeAddress() {
+        return this.snodeController.getSnodeConfig().getSnodeIP1() + ":" + this.snodeController.getSnodeConfig().getListenPort();
+    }
+
     @Override
     public void registerSnode(SnodeConfig snodeConfig) {
         List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
         RemotingCommand remotingCommand = new RemotingCommand();
         RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
-        requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+        requestHeader.setSnodeAddr(getSnodeAddress());
         requestHeader.setSnodeName(snodeConfig.getSnodeName());
         requestHeader.setClusterName(snodeConfig.getClusterName());
         remotingCommand.setCustomHeader(requestHeader);