You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/29 15:10:59 UTC
[rocketmq] branch snode updated: Add snode route info obtain logic
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 256912a Add snode route info obtain logic
256912a is described below
commit 256912a59756a05a753ce058c769e0dc2583a97b
Author: duhenglucky <du...@gmail.com>
AuthorDate: Tue Jan 29 23:09:29 2019 +0800
Add snode route info obtain logic
---
.../rocketmq/common/protocol/RequestCode.java | 6 +++
.../rocketmq/common/protocol/body/ClusterInfo.java | 20 +++++++++
.../common/protocol/body/SnodeClusterInfo.java | 43 +++++++++++++++++++
.../header/namesrv/GetSnodeInfoHeader.java | 37 +++++++++++++++++
.../namesrv/processor/DefaultRequestProcessor.java | 35 ++++++++++++++--
.../namesrv/routeinfo/RouteInfoManager.java | 28 +++++++++++--
.../org/apache/rocketmq/snode/SnodeStartup.java | 4 +-
.../apache/rocketmq/snode/config/SnodeConfig.java | 48 +++++++++++++++++++---
.../snode/service/impl/NnodeServiceImpl.java | 6 ++-
9 files changed, 211 insertions(+), 16 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index f9b097f..61a8e40 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -174,4 +174,10 @@ public class RequestCode {
public static final int SNODE_PUSH_MESSAGE = 352;
+ public static final int GET_SNODE_CLUSTER_INFO = 353;
+
+ public static final int GET_SNODE_INFO = 354;
+
+
+
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
index 9c4d913..5a896dc 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ClusterInfo.java
@@ -24,11 +24,14 @@ import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
public class ClusterInfo extends RemotingSerializable {
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
+ private HashMap<String/* snodeName*/, SnodeData> snodeTable;
+ private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
public HashMap<String, BrokerData> getBrokerAddrTable() {
return brokerAddrTable;
@@ -61,6 +64,23 @@ public class ClusterInfo extends RemotingSerializable {
return addrs.toArray(new String[] {});
}
+ public HashMap<String, SnodeData> getSnodeTable() {
+ return snodeTable;
+ }
+
+ public void setSnodeTable(
+ HashMap<String, SnodeData> snodeTable) {
+ this.snodeTable = snodeTable;
+ }
+
+ public HashMap<String, Set<String>> getSnodeCluster() {
+ return snodeCluster;
+ }
+
+ public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
+ this.snodeCluster = snodeCluster;
+ }
+
public String[] retrieveAllMasterAddrByCluster(String cluster) {
List<String> addrs = new ArrayList<String>();
if (clusterAddrTable.containsKey(cluster)) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java
new file mode 100644
index 0000000..2d42bdf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SnodeClusterInfo.java
@@ -0,0 +1,43 @@
+package org.apache.rocketmq.common.protocol.body;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Set;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
+import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
+
+public class SnodeClusterInfo extends RemotingSerializable {
+ private HashMap<String/* snodeName*/, SnodeData> snodeTable;
+ private HashMap<String/* clusterName*/, Set<String/*snodeName*/>> snodeCluster;
+
+ public HashMap<String, SnodeData> getSnodeTable() {
+ return snodeTable;
+ }
+
+ public void setSnodeTable(
+ HashMap<String, SnodeData> snodeTable) {
+ this.snodeTable = snodeTable;
+ }
+
+ public HashMap<String, Set<String>> getSnodeCluster() {
+ return snodeCluster;
+ }
+
+ public void setSnodeCluster(HashMap<String, Set<String>> snodeCluster) {
+ this.snodeCluster = snodeCluster;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java
new file mode 100644
index 0000000..1d094e3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetSnodeInfoHeader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.protocol.header.namesrv;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class GetSnodeInfoHeader implements CommandCustomHeader {
+ private String snodeClusterName;
+
+ @Override
+ public void checkFields() throws RemotingCommandException {
+
+ }
+
+ public String getSnodeClusterName() {
+ return snodeClusterName;
+ }
+
+ public void setSnodeClusterName(String snodeClusterName) {
+ this.snodeClusterName = snodeClusterName;
+ }
+}
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index e3136fa..73e98ea 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -27,9 +27,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -43,20 +40,24 @@ import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHead
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.GetSnodeInfoHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -128,12 +129,38 @@ public class DefaultRequestProcessor implements RequestProcessor {
return this.getConfig(ctx, request);
case RequestCode.REGISTER_SNODE:
return this.registerSnode(ctx, request);
+ case RequestCode.GET_SNODE_CLUSTER_INFO:
+ return this.getSnodeClusterInfo(ctx, request);
+ case RequestCode.GET_SNODE_INFO:
+ return getSnodeInfo(ctx, request);
default:
break;
}
return null;
}
+ public RemotingCommand getSnodeClusterInfo(ChannelHandlerContext ctx,
+ RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ byte[] content = this.namesrvController.getRouteInfoManager().getAllSnodeData();
+ response.setBody(content);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand getSnodeInfo(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ final GetSnodeInfoHeader requestHeader =
+ (GetSnodeInfoHeader) request.decodeCommandCustomHeader(GetSnodeInfoHeader.class);
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ byte[] content = this.namesrvController.getRouteInfoManager().getSnodeDatabyClusterName(requestHeader.getSnodeClusterName());
+ response.setBody(content);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
public RemotingCommand registerSnode(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 58edcab..c3342dc 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -33,17 +33,18 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.common.protocol.route.SnodeData;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class RouteInfoManager {
@@ -72,9 +73,30 @@ public class RouteInfoManager {
ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();
clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);
clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);
+ clusterInfoSerializeWrapper.setSnodeCluster(this.snodeCluster);
+ clusterInfoSerializeWrapper.setSnodeTable(this.snodeTable);
return clusterInfoSerializeWrapper.encode();
}
+ public byte[] getSnodeDatabyClusterName(String clusterName) {
+ SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
+ Set<String> snodeNameSet = this.snodeCluster.get(clusterName);
+ HashMap<String, SnodeData> snodeDatas = new HashMap<>();
+ for (String snodeName : snodeNameSet) {
+ SnodeData snodeData = this.snodeTable.get(snodeName);
+ snodeDatas.putIfAbsent(clusterName, snodeData);
+ }
+ snodeClusterInfo.setSnodeTable(snodeDatas);
+ return snodeClusterInfo.encode();
+ }
+
+ public byte[] getAllSnodeData() {
+ SnodeClusterInfo snodeClusterInfo = new SnodeClusterInfo();
+ snodeClusterInfo.setSnodeCluster(this.snodeCluster);
+ snodeClusterInfo.setSnodeTable(snodeTable);
+ return snodeClusterInfo.encode();
+ }
+
public void deleteTopic(final String topic) {
try {
try {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 516a805..0e753e7 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -32,9 +32,9 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.config.SnodeConfig;
@@ -58,7 +58,7 @@ public class SnodeStartup {
controller.start();
String tip = "The snode[" + controller.getSnodeConfig().getSnodeName() + ", "
- + controller.getSnodeConfig().getSnodeAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ + controller.getSnodeConfig().getSnodeIP1() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getSnodeConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getSnodeConfig().getNamesrvAddr();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
index 0d41501..e37dd7c 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java
@@ -16,11 +16,14 @@
*/
package org.apache.rocketmq.snode.config;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import static org.apache.rocketmq.client.ClientConfig.SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY;
@@ -33,9 +36,16 @@ public class SnodeConfig {
@ImportantField
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
- private String snodeName = "defaultNode";
+ @ImportantField
+ private String snodeIP1 = RemotingUtil.getLocalAddress();
+
+ private String snodeIP2 = RemotingUtil.getLocalAddress();
+
+ @ImportantField
+ private String snodeName = localHostName();
- private String snodeAddr = "127.0.0.1:11911";
+ @ImportantField
+ private long snodeId = MixAll.MASTER_ID;
private String clusterName = "defaultCluster";
@@ -135,6 +145,16 @@ public class SnodeConfig {
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
+ public static String localHostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ log.error("Failed to obtain the host name", e);
+ }
+
+ return "DEFAULT_SNODE";
+ }
+
public boolean isFetchNamesrvAddrByAddressServer() {
return fetchNamesrvAddrByAddressServer;
}
@@ -207,12 +227,28 @@ public class SnodeConfig {
this.snodeSendMessageMaxPoolSize = snodeSendMessageMaxPoolSize;
}
- public String getSnodeAddr() {
- return snodeAddr;
+ public String getSnodeIP1() {
+ return snodeIP1;
+ }
+
+ public void setSnodeIP1(String snodeIP1) {
+ this.snodeIP1 = snodeIP1;
+ }
+
+ public String getSnodeIP2() {
+ return snodeIP2;
+ }
+
+ public void setSnodeIP2(String snodeIP2) {
+ this.snodeIP2 = snodeIP2;
+ }
+
+ public long getSnodeId() {
+ return snodeId;
}
- public void setSnodeAddr(String snodeAddr) {
- this.snodeAddr = snodeAddr;
+ public void setSnodeId(long snodeId) {
+ this.snodeId = snodeId;
}
public String getSnodeName() {
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index 8853b1f..d30543e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -53,12 +53,16 @@ public class NnodeServiceImpl implements NnodeService {
this.snodeController = snodeController;
}
+ public String getSnodeAddress() {
+ return this.snodeController.getSnodeConfig().getSnodeIP1() + ":" + this.snodeController.getSnodeConfig().getListenPort();
+ }
+
@Override
public void registerSnode(SnodeConfig snodeConfig) {
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
- requestHeader.setSnodeAddr(snodeConfig.getSnodeAddr());
+ requestHeader.setSnodeAddr(getSnodeAddress());
requestHeader.setSnodeName(snodeConfig.getSnodeName());
requestHeader.setClusterName(snodeConfig.getClusterName());
remotingCommand.setCustomHeader(requestHeader);