You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/08/02 06:31:33 UTC

[GitHub] vongosling closed pull request #230: [ROCKETMQ-350] Add multiple Network Interface Cards support

vongosling closed pull request #230: [ROCKETMQ-350] Add multiple Network Interface Cards support
URL: https://github.com/apache/rocketmq/pull/230
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index f45674d6e..60985fcf3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -752,7 +752,15 @@ private void unregisterBrokerAll() {
     }
 
     public String getBrokerAddr() {
-        return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
+        if (brokerConfig.isEnableMultipleNICSupport()) {
+            return this.getBrokerMultipleAddr();
+        } else {
+            return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
+        }
+    }
+
+    public String getBrokerMultipleAddr() {
+        return this.brokerConfig.getBrokerMultopleIP() + ":" + this.nettyServerConfig.getListenPort();
     }
 
     public void start() throws Exception {
@@ -920,6 +928,10 @@ public String getHAServerAddr() {
         return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
     }
 
+    public String getHAServerMultipleAddr() {
+        return this.brokerConfig.getBrokerMultopleIP() + ":" + this.messageStoreConfig.getHaListenPort();
+    }
+
     public RebalanceLockManager getRebalanceLockManager() {
         return rebalanceLockManager;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 643a812fa..b50a87f76 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -26,6 +26,7 @@
 import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.utils.MultipleAddrConvertor;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 
 public class SlaveSynchronize {
@@ -42,7 +43,7 @@ public String getMasterAddr() {
     }
 
     public void setMasterAddr(String masterAddr) {
-        this.masterAddr = masterAddr;
+        this.masterAddr = MultipleAddrConvertor.convert(masterAddr);
     }
 
     public void syncAll() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 3a023e362..b1904a77f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -135,6 +135,7 @@
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.utils.MultipleAddrConvertor;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -1213,7 +1214,7 @@ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final
             case ResponseCode.SUCCESS: {
                 byte[] body = response.getBody();
                 if (body != null) {
-                    return TopicRouteData.decode(body, TopicRouteData.class);
+                    return MultipleAddrConvertor.convert(TopicRouteData.decode(body, TopicRouteData.class));
                 }
             }
             default:
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 442f456aa..946ed6df7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -34,6 +34,7 @@
     @ImportantField
     private String brokerIP1 = RemotingUtil.getLocalAddress();
     private String brokerIP2 = RemotingUtil.getLocalAddress();
+    private String multipleIp = RemotingUtil.getMultipleLocalAddress();
     @ImportantField
     private String brokerName = localHostName();
     @ImportantField
@@ -135,6 +136,8 @@
     private boolean filterSupportRetry = false;
     private boolean enablePropertyFilter = false;
 
+    private boolean enableMultipleNICSupport = false;
+  
     private boolean compressedRegister = false;
 
     private boolean forceRegister = true;
@@ -338,6 +341,10 @@ public String getBrokerIP2() {
         return brokerIP2;
     }
 
+    public String getBrokerMultopleIP() {
+        return multipleIp;
+    }
+
     public void setBrokerIP2(String brokerIP2) {
         this.brokerIP2 = brokerIP2;
     }
@@ -630,6 +637,14 @@ public void setEnablePropertyFilter(boolean enablePropertyFilter) {
         this.enablePropertyFilter = enablePropertyFilter;
     }
 
+    public boolean isEnableMultipleNICSupport() {
+        return enableMultipleNICSupport;
+    }
+
+    public void setEnableMultipleNICSupport(boolean enableMultipleNICSupport) {
+        this.enableMultipleNICSupport = enableMultipleNICSupport;
+    }
+  
     public boolean isCompressedRegister() {
         return compressedRegister;
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/MultipleAddrConvertor.java b/common/src/main/java/org/apache/rocketmq/common/utils/MultipleAddrConvertor.java
new file mode 100644
index 000000000..97c49f267
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/MultipleAddrConvertor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class MultipleAddrConvertor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+    private static final int CACHE_MAX_LEN = 1024;
+
+    private static LinkedHashMap<String, String> chosenAddrCache = new LinkedHashMap<String, String>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry eldest) {
+            return size() > CACHE_MAX_LEN;
+        }
+    };
+
+    public static TopicRouteData convert(TopicRouteData topicRouteData) {
+        if (topicRouteData == null) {
+            return topicRouteData;
+        }
+        List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
+
+        for (BrokerData brokerData : brokerDatas) {
+            convert(brokerData);
+        }
+        return topicRouteData;
+    }
+
+    public static BrokerData convert(BrokerData brokerData) {
+
+        HashMap<Long, String> brokerIdAddr = brokerData.getBrokerAddrs();
+        for (Long brokerId : brokerIdAddr.keySet()) {
+
+            String multipleAddr = brokerIdAddr.get(brokerId);
+            if (multipleAddr == null) {
+            } else {
+                String[] addrs = multipleAddr.split(";");
+                if (addrs.length <= 1) {
+                } else {
+                    if (chosenAddrCache.get(multipleAddr) != null) {
+                        brokerIdAddr.put(brokerId, chosenAddrCache.get(multipleAddr));
+                    } else {
+                        String addr = convert(multipleAddr);
+                        if (addr != null) {
+                            chosenAddrCache.put(multipleAddr, addr);
+                        }
+                        brokerIdAddr.put(brokerId, convert(addr));
+                    }
+                }
+            }
+        }
+
+        return brokerData;
+    }
+
+    public static String convert(String multipleAddr) {
+
+        if (multipleAddr == null || multipleAddr.length() < 8) {
+            return multipleAddr;
+        }
+        String[] ipsPort = multipleAddr.split(":");
+        if (ipsPort.length != 2) {
+            return null;
+        } else {
+            try {
+                Integer.parseInt(ipsPort[1]);
+            } catch (Exception e) {
+                return null;
+            }
+        }
+        final String portNum = ipsPort[1];
+        String[] ips = ipsPort[0].split(";");
+        if (ips.length <= 1) {
+            return multipleAddr;
+        }
+
+        final List<String> availableAddrs = new ArrayList<String>();
+
+        for (final String aip : ips) {
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+
+            executor.submit(new Runnable() {
+                @Override public void run() {
+                    try {
+                        InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getByName(aip), Integer.parseInt(portNum));
+                        SocketChannel sc = SocketChannel.open();
+                        sc.configureBlocking(true);
+
+                        if (sc.connect(socketAddress)) {
+                            availableAddrs.add(aip + ":" + portNum);
+                            sc.close();
+                        }
+
+                    } catch (Exception e) {
+                        log.info("Exception when host detecting " + aip + ":" + portNum);
+                    }
+                }
+            });
+
+            try {
+                if (executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                } else {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                log.info("Exception when waiting host detecting for " + aip + ":" + portNum);
+                e.printStackTrace();
+            }
+        }
+
+        if (availableAddrs.size() >= 1) {
+            return availableAddrs.get(0);
+        }
+        return null;
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
index 586689637..851494ddc 100644
--- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java
@@ -28,4 +28,11 @@ public void testGetLocalAddress() throws Exception {
         assertThat(localAddress).isNotNull();
         assertThat(localAddress.length()).isGreaterThan(0);
     }
+
+    @Test
+    public void testGetMultipleLocalAddress() throws Exception {
+        String localAddress = RemotingUtil.getMultipleLocalAddress();
+        assertThat(localAddress).isNotNull();
+        assertThat(localAddress.length()).isGreaterThan(0);
+    }
 }
diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/MultipleAddrConvertorTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/MultipleAddrConvertorTest.java
new file mode 100644
index 000000000..cab6a431c
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/utils/MultipleAddrConvertorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.utils;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MultipleAddrConvertorTest {
+
+    @Test
+    public void testConvertAddr() {
+        String multiTestAddr = "100.81.249.108;127.0.0.1;127.0.0.1:10911";
+        String availableAddress = MultipleAddrConvertor.convert(multiTestAddr);
+        if (availableAddress != null) {
+            String[] addrs = availableAddress.split(";");
+            assertThat(addrs.length).isEqualTo(1);
+            String[] parts = availableAddress.split(":");
+            assertThat(parts.length).isEqualTo(2);
+        }
+    }
+
+    @Test
+    public void testConvertTopicRouteData() {
+
+        HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
+        brokerAddrs.put(0L, "100.81.249.108;127.0.0.1;127.0.0.1:10911");
+        brokerAddrs.put(1L, "100.81.249.108:10911");
+
+        BrokerData brokerData1 = new BrokerData();
+        brokerData1.setBrokerName("testBroker1");
+        brokerData1.setBrokerAddrs(brokerAddrs);
+
+        BrokerData brokerData2 = new BrokerData();
+        brokerData2.setBrokerName("testBroker2");
+        brokerData2.setBrokerAddrs(brokerAddrs);
+
+        List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
+        brokerDataList.add(brokerData1);
+        brokerDataList.add(brokerData2);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        TopicRouteData convertedRouteData = MultipleAddrConvertor.convert(topicRouteData);
+
+        HashMap<Long, String> newBrokerAddrs = convertedRouteData.getBrokerDatas().get(0).getBrokerAddrs();
+        if (newBrokerAddrs.get(0L) != null) {
+            String[] addrs = newBrokerAddrs.get(0L).split(";");
+            assertThat(addrs.length).isEqualTo(1);
+            String[] parts = newBrokerAddrs.get(0L).split(":");
+            assertThat(parts.length).isEqualTo(2);
+        }
+        assertThat(newBrokerAddrs.get(1L)).isEqualToIgnoringCase("100.81.249.108:10911");
+    }
+
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 3da3a1839..3c192bf1b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -136,6 +136,58 @@ public static String getLocalAddress() {
         return null;
     }
 
+    public static String getMultipleLocalAddress() {
+        try {
+            // Traversal Network interface to get all non-loopback and non-private address
+            Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+            ArrayList<String> ipv4Result = new ArrayList<String>();
+            ArrayList<String> ipv6Result = new ArrayList<String>();
+            while (enumeration.hasMoreElements()) {
+                final NetworkInterface networkInterface = enumeration.nextElement();
+                final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
+                while (en.hasMoreElements()) {
+                    final InetAddress address = en.nextElement();
+                    if (!address.isLoopbackAddress()) {
+                        if (address instanceof Inet6Address) {
+                            ipv6Result.add(normalizeHostAddress(address));
+                        } else {
+                            ipv4Result.add(normalizeHostAddress(address));
+                        }
+                    }
+                }
+            }
+
+            StringBuilder sb = new StringBuilder();
+            String delimiter = "";
+            // prefer ipv4
+            if (!ipv4Result.isEmpty()) {
+                for (String ip : ipv4Result) {
+                    sb.append(delimiter);
+                    delimiter = ";";
+                    sb.append(ip);
+                }
+
+                return sb.toString();
+            } else if (!ipv6Result.isEmpty()) {
+                for (String ip : ipv6Result) {
+                    sb.append(delimiter);
+                    delimiter = ";";
+                    sb.append(ip);
+                }
+
+                return sb.toString();
+            }
+            //If failed to find,fall back to localhost
+            final InetAddress localHost = InetAddress.getLocalHost();
+            return normalizeHostAddress(localHost);
+        } catch (Exception e) {
+            //log.error("Failed to obtain local address", e);
+            e.printStackTrace();
+        }
+
+        return null;
+    }
+
     public static String normalizeHostAddress(final InetAddress localHost) {
         if (localHost instanceof Inet6Address) {
             return "[" + localHost.getHostAddress() + "]";
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index bcd66669c..1f9293efe 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -75,6 +75,7 @@
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.utils.MultipleAddrConvertor;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
@@ -592,6 +593,7 @@ public GroupList queryTopicConsumeByWho(
         String topic) throws InterruptedException, MQBrokerException, RemotingException,
         MQClientException {
         TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
+        topicRouteData = MultipleAddrConvertor.convert(topicRouteData);
 
         for (BrokerData bd : topicRouteData.getBrokerDatas()) {
             String addr = bd.selectBrokerAddr();
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index 2e65f9808..56afad610 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -28,6 +28,7 @@
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.utils.MultipleAddrConvertor;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
@@ -88,7 +89,7 @@
             for (String brokerName : brokerNameSet) {
                 BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
                 if (brokerData != null) {
-
+                    MultipleAddrConvertor.convert(brokerData);
                     String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                     if (addr != null) {
                         masterSet.add(addr);
@@ -112,6 +113,7 @@
             for (String brokerName : brokerNameSet) {
                 BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
                 if (brokerData != null) {
+                    MultipleAddrConvertor.convert(brokerData);
                     final Collection<String> addrs = brokerData.getBrokerAddrs().values();
                     brokerAddressSet.addAll(addrs);
                 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
index 6a0cd71c1..e452fffd7 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java
@@ -27,6 +27,7 @@
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.utils.MultipleAddrConvertor;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -194,7 +195,7 @@ private void printClusterBaseInfo(
             for (String brokerName : brokerNameSet) {
                 BrokerData brokerData = clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
                 if (brokerData != null) {
-
+                    MultipleAddrConvertor.convert(brokerData);
                     Iterator<Map.Entry<Long, String>> itAddr = brokerData.getBrokerAddrs().entrySet().iterator();
                     while (itAddr.hasNext()) {
                         Map.Entry<Long, String> next1 = itAddr.next();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services