You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/31 03:04:58 UTC

[rocketmq] branch snode updated: Polish snode register process

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 9fc0ce4  Polish snode register process
9fc0ce4 is described below

commit 9fc0ce454f558f8d385116da60a812868388945a
Author: duhenglucky <du...@gmail.com>
AuthorDate: Thu Jan 31 11:03:29 2019 +0800

    Polish snode register process
---
 .../snode/client/impl/SlowConsumerServiceImpl.java |  1 -
 .../mqtthandler/MqttSubscribeMessageHandler.java   |  9 ++-
 .../rocketmq/snode/service/NnodeService.java       | 23 +++++-
 .../snode/service/impl/EnodeServiceImpl.java       |  4 +-
 .../snode/service/impl/NnodeServiceImpl.java       |  5 +-
 .../snode/service/impl/ScheduledServiceImpl.java   |  7 +-
 .../snode/processor/SendMessageProcessorTest.java  |  9 +--
 .../snode/service/EnodeServiceImplTest.java        | 22 ++++--
 .../snode/service/NnodeServiceImplTest.java        | 87 ++++++++++++++++++++++
 9 files changed, 143 insertions(+), 24 deletions(-)

diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
index b96f4ee..d49a4da 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
@@ -41,7 +41,6 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
             log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset);
             return true;
         }
-
         return false;
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
index 2867a97..f264077 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
@@ -24,16 +24,17 @@ import org.apache.rocketmq.snode.SnodeController;
 
 public class MqttSubscribeMessageHandler implements MessageHandler {
 
-/*    private SubscriptionStore subscriptionStore;
+    /*    private SubscriptionStore subscriptionStore;
 
-    public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
-        this.subscriptionStore = subscriptionStore;
-    }*/
+        public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
+            this.subscriptionStore = subscriptionStore;
+        }*/
     private final SnodeController snodeController;
 
     public MqttSubscribeMessageHandler(SnodeController snodeController) {
         this.snodeController = snodeController;
     }
+
     /**
      * handle the SUBSCRIBE message from the client
      * <ol>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
index 1c358b7..84f224b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
@@ -26,10 +26,25 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.snode.config.SnodeConfig;
 
 public interface NnodeService {
-    void registerSnode(SnodeConfig snodeConfig);
-
-    void updateNnodeAddressList(final String addrs);
-
+    /**
+     * Register Snode to Nnode(Name server) includes information: snodeAddress, snodeName, snodeClusterName.
+     *
+     * @param snodeConfig {@link SnodeConfig}
+     */
+    void registerSnode(SnodeConfig snodeConfig) throws Exception;
+
+    /**
+     * Update Nnode server address list.
+     *
+     * @param addresses Node name service list
+     */
+    void updateNnodeAddressList(final String addresses);
+
+    /**
+     * Fetch Node server address
+     *
+     * @return Node address
+     */
     String fetchNnodeAdress();
 
     void updateTopicRouteDataByTopic();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index 3a1a7fb..e1a02c0 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -244,12 +244,12 @@ public class EnodeServiceImpl implements EnodeService {
         TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
+        requestHeader.setPerm(topicConfig.getPerm());
         requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
         requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
-        requestHeader.setPerm(topicConfig.getPerm());
+        requestHeader.setOrder(topicConfig.isOrder());
         requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
         requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
-        requestHeader.setOrder(topicConfig.isOrder());
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
         String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
         return this.snodeController.getRemotingClient().invokeSync(address,
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index d30543e..d593613 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService {
     }
 
     @Override
-    public void registerSnode(SnodeConfig snodeConfig) {
+    public void registerSnode(SnodeConfig snodeConfig) throws Exception{
         List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
         RemotingCommand remotingCommand = new RemotingCommand();
         RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
@@ -75,6 +75,9 @@ public class NnodeServiceImpl implements NnodeService {
                     log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
                 }
             }
+        } else {
+            log.warn("Nnode server list is null");
+            throw new RemotingSendRequestException("Nnode server list is null");
         }
     }
 
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 5370e64..685af3f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.rocketmq.snode.service.impl;
+
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -88,7 +89,11 @@ public class ScheduledServiceImpl implements ScheduledService {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                snodeController.getNnodeService().registerSnode(snodeConfig);
+                try {
+                    snodeController.getNnodeService().registerSnode(snodeConfig);
+                } catch (Exception ex) {
+                    log.warn("Register snode error", ex);
+                }
             }
         }, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
 
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index 448abc9..a7a2667 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SendMessageProcessorTest {
+
     private SendMessageProcessor sendMessageProcessor;
 
     @Spy
@@ -52,12 +53,10 @@ public class SendMessageProcessorTest {
     @Mock
     private RemotingChannel remotingChannel;
 
-    private String topic = "SnodeTopic";
-
-    private String group = "SnodeGroup";
-
-    private String enodeName = "enodeName";
+    private String topic = "snodeTopic";
 
+    private String group = "snodeGroup";
+    
     @Mock
     private EnodeService enodeService;
 
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
index 3cd4125..c228a83 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.snode.service;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -53,6 +53,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class EnodeServiceImplTest extends SnodeTestBase {
+
     private EnodeService enodeService;
 
     @Spy
@@ -66,20 +67,20 @@ public class EnodeServiceImplTest extends SnodeTestBase {
 
     private String enodeName = "enodeName";
 
-    private String topic = "SnodeTopic";
+    private String topic = "snodeTopic";
 
-    private String group = "SnodeGroup";
+    private String group = "snodeGroup";
 
     @Before
     public void init() {
+        snodeController.setNnodeService(nnodeService);
+        snodeController.setRemotingClient(remotingClient);
         enodeService = new EnodeServiceImpl(snodeController);
     }
 
     @Test
     public void sendMessageTest() throws Exception {
-        snodeController.setNnodeService(nnodeService);
-        snodeController.setRemotingClient(remotingClient);
-        when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024");
+        when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
         CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>();
         doAnswer(new Answer() {
             @Override
@@ -123,6 +124,15 @@ public class EnodeServiceImplTest extends SnodeTestBase {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+    @Test
+    public void creatTopicTest() throws Exception {
+        when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
+        when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
+        TopicConfig topicConfig = new TopicConfig(topic, 1, 1, 2);
+        RemotingCommand response = enodeService.creatTopic(enodeName, topicConfig);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
     private GetMessageResult createGetMessageResult() {
         GetMessageResult getMessageResult = new GetMessageResult();
         getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
new file mode 100644
index 0000000..45a19dd
--- /dev/null
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.SnodeTestBase;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NnodeServiceImplTest extends SnodeTestBase {
+
+    @Spy
+    private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+
+    @Mock
+    private NettyRemotingClient remotingClient;
+
+    private NnodeService nnodeService;
+
+    @Before
+    public void init() {
+        snodeController.setRemotingClient(remotingClient);
+        nnodeService = new NnodeServiceImpl(snodeController);
+    }
+
+    @Test
+    public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException,
+        RemotingSendRequestException, RemotingTimeoutException {
+        when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList());
+        when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
+        try {
+            nnodeService.registerSnode(createSnodeConfig());
+        } catch (Exception ex) {
+            assertThat(ex).isNull();
+        }
+    }
+
+    private List createNnodeList() {
+        List<String> addresses = new ArrayList<>();
+        addresses.add("127.0.0.1:9876");
+        return addresses;
+    }
+
+    private SnodeConfig createSnodeConfig() {
+        SnodeConfig snodeConfig = new SnodeConfig();
+        snodeConfig.setClusterName("defaultCluster");
+        snodeConfig.setSnodeIP1("127.0.0.1:10911");
+        snodeConfig.setSnodeName("snode-a");
+        return snodeConfig;
+    }
+}