You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/31 03:04:58 UTC
[rocketmq] branch snode updated: Polish snode register process
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 9fc0ce4 Polish snode register process
9fc0ce4 is described below
commit 9fc0ce454f558f8d385116da60a812868388945a
Author: duhenglucky <du...@gmail.com>
AuthorDate: Thu Jan 31 11:03:29 2019 +0800
Polish snode register process
---
.../snode/client/impl/SlowConsumerServiceImpl.java | 1 -
.../mqtthandler/MqttSubscribeMessageHandler.java | 9 ++-
.../rocketmq/snode/service/NnodeService.java | 23 +++++-
.../snode/service/impl/EnodeServiceImpl.java | 4 +-
.../snode/service/impl/NnodeServiceImpl.java | 5 +-
.../snode/service/impl/ScheduledServiceImpl.java | 7 +-
.../snode/processor/SendMessageProcessorTest.java | 9 +--
.../snode/service/EnodeServiceImplTest.java | 22 ++++--
.../snode/service/NnodeServiceImplTest.java | 87 ++++++++++++++++++++++
9 files changed, 143 insertions(+), 24 deletions(-)
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
index b96f4ee..d49a4da 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SlowConsumerServiceImpl.java
@@ -41,7 +41,6 @@ public class SlowConsumerServiceImpl implements SlowConsumerService {
log.warn("[SlowConsumer] group: {}, lastAckedOffset:{} nowOffset:{} ", consumerGroup, ackedOffset, currentOffset);
return true;
}
-
return false;
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
index 2867a97..f264077 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttSubscribeMessageHandler.java
@@ -24,16 +24,17 @@ import org.apache.rocketmq.snode.SnodeController;
public class MqttSubscribeMessageHandler implements MessageHandler {
-/* private SubscriptionStore subscriptionStore;
+ /* private SubscriptionStore subscriptionStore;
- public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
- this.subscriptionStore = subscriptionStore;
- }*/
+ public MqttSubscribeMessageHandler(SubscriptionStore subscriptionStore) {
+ this.subscriptionStore = subscriptionStore;
+ }*/
private final SnodeController snodeController;
public MqttSubscribeMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
+
/**
* handle the SUBSCRIBE message from the client
* <ol>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
index 1c358b7..84f224b 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java
@@ -26,10 +26,25 @@ import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.snode.config.SnodeConfig;
public interface NnodeService {
- void registerSnode(SnodeConfig snodeConfig);
-
- void updateNnodeAddressList(final String addrs);
-
+ /**
+ * Register Snode to Nnode(Name server) includes information: snodeAddress, snodeName, snodeClusterName.
+ *
+ * @param snodeConfig {@link SnodeConfig}
+ */
+ void registerSnode(SnodeConfig snodeConfig) throws Exception;
+
+ /**
+ * Update Nnode server address list.
+ *
+ * @param addresses Node name service list
+ */
+ void updateNnodeAddressList(final String addresses);
+
+ /**
+ * Fetch Node server address
+ *
+ * @return Node address
+ */
String fetchNnodeAdress();
void updateTopicRouteDataByTopic();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index 3a1a7fb..e1a02c0 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -244,12 +244,12 @@ public class EnodeServiceImpl implements EnodeService {
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
+ requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
- requestHeader.setPerm(topicConfig.getPerm());
+ requestHeader.setOrder(topicConfig.isOrder());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
- requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index d30543e..d593613 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -58,7 +58,7 @@ public class NnodeServiceImpl implements NnodeService {
}
@Override
- public void registerSnode(SnodeConfig snodeConfig) {
+ public void registerSnode(SnodeConfig snodeConfig) throws Exception{
List<String> nnodeAddressList = this.snodeController.getRemotingClient().getNameServerAddressList();
RemotingCommand remotingCommand = new RemotingCommand();
RegisterSnodeRequestHeader requestHeader = new RegisterSnodeRequestHeader();
@@ -75,6 +75,9 @@ public class NnodeServiceImpl implements NnodeService {
log.warn("Register Snode to Nnode addr: {} error, ex:{} ", nodeAddress, ex);
}
}
+ } else {
+ log.warn("Nnode server list is null");
+ throw new RemotingSendRequestException("Nnode server list is null");
}
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
index 5370e64..685af3f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ScheduledServiceImpl.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
+
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -88,7 +89,11 @@ public class ScheduledServiceImpl implements ScheduledService {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- snodeController.getNnodeService().registerSnode(snodeConfig);
+ try {
+ snodeController.getNnodeService().registerSnode(snodeConfig);
+ } catch (Exception ex) {
+ log.warn("Register snode error", ex);
+ }
}
}, 0, Math.max(10000, Math.min(snodeConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index 448abc9..a7a2667 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -44,6 +44,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SendMessageProcessorTest {
+
private SendMessageProcessor sendMessageProcessor;
@Spy
@@ -52,12 +53,10 @@ public class SendMessageProcessorTest {
@Mock
private RemotingChannel remotingChannel;
- private String topic = "SnodeTopic";
-
- private String group = "SnodeGroup";
-
- private String enodeName = "enodeName";
+ private String topic = "snodeTopic";
+ private String group = "snodeGroup";
+
@Mock
private EnodeService enodeService;
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
index 3cd4125..c228a83 100644
--- a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
@@ -53,6 +53,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EnodeServiceImplTest extends SnodeTestBase {
+
private EnodeService enodeService;
@Spy
@@ -66,20 +67,20 @@ public class EnodeServiceImplTest extends SnodeTestBase {
private String enodeName = "enodeName";
- private String topic = "SnodeTopic";
+ private String topic = "snodeTopic";
- private String group = "SnodeGroup";
+ private String group = "snodeGroup";
@Before
public void init() {
+ snodeController.setNnodeService(nnodeService);
+ snodeController.setRemotingClient(remotingClient);
enodeService = new EnodeServiceImpl(snodeController);
}
@Test
public void sendMessageTest() throws Exception {
- snodeController.setNnodeService(nnodeService);
- snodeController.setRemotingClient(remotingClient);
- when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024");
+ when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>();
doAnswer(new Answer() {
@Override
@@ -123,6 +124,15 @@ public class EnodeServiceImplTest extends SnodeTestBase {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+ @Test
+ public void creatTopicTest() throws Exception {
+ when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("127.0.0.1:10911");
+ when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
+ TopicConfig topicConfig = new TopicConfig(topic, 1, 1, 2);
+ RemotingCommand response = enodeService.creatTopic(enodeName, topicConfig);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
private GetMessageResult createGetMessageResult() {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND);
diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
new file mode 100644
index 0000000..45a19dd
--- /dev/null
+++ b/snode/src/test/java/org/apache/rocketmq/snode/service/NnodeServiceImplTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.service;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
+import org.apache.rocketmq.snode.SnodeController;
+import org.apache.rocketmq.snode.SnodeTestBase;
+import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class NnodeServiceImplTest extends SnodeTestBase {
+
+ @Spy
+ private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
+
+ @Mock
+ private NettyRemotingClient remotingClient;
+
+ private NnodeService nnodeService;
+
+ @Before
+ public void init() {
+ snodeController.setRemotingClient(remotingClient);
+ nnodeService = new NnodeServiceImpl(snodeController);
+ }
+
+ @Test
+ public void registerSnodeSuccessTest() throws InterruptedException, RemotingConnectException,
+ RemotingSendRequestException, RemotingTimeoutException {
+ when(snodeController.getRemotingClient().getNameServerAddressList()).thenReturn(createNnodeList());
+ when(snodeController.getRemotingClient().invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(createSuccessResponse());
+ try {
+ nnodeService.registerSnode(createSnodeConfig());
+ } catch (Exception ex) {
+ assertThat(ex).isNull();
+ }
+ }
+
+ private List createNnodeList() {
+ List<String> addresses = new ArrayList<>();
+ addresses.add("127.0.0.1:9876");
+ return addresses;
+ }
+
+ private SnodeConfig createSnodeConfig() {
+ SnodeConfig snodeConfig = new SnodeConfig();
+ snodeConfig.setClusterName("defaultCluster");
+ snodeConfig.setSnodeIP1("127.0.0.1:10911");
+ snodeConfig.setSnodeName("snode-a");
+ return snodeConfig;
+ }
+}