You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:33 UTC
[16/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
new file mode 100644
index 0000000..105cfff
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.filtersrv.processor;
+
+import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
+import com.alibaba.rocketmq.client.consumer.PullCallback;
+import com.alibaba.rocketmq.client.consumer.PullResult;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.filter.FilterContext;
+import com.alibaba.rocketmq.common.message.MessageDecoder;
+import com.alibaba.rocketmq.common.message.MessageExt;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
+import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
+import com.alibaba.rocketmq.filtersrv.FiltersrvController;
+import com.alibaba.rocketmq.filtersrv.filter.FilterClassInfo;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.CommitLog;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultRequestProcessor implements NettyRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+
+ private final FiltersrvController filtersrvController;
+
+
+ public DefaultRequestProcessor(FiltersrvController filtersrvController) {
+ this.filtersrvController = filtersrvController;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("receive request, {} {} {}",
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
+ }
+
+ switch (request.getCode()) {
+ case RequestCode.REGISTER_MESSAGE_FILTER_CLASS:
+ return registerMessageFilterClass(ctx, request);
+ case RequestCode.PULL_MESSAGE:
+ return pullMessageForward(ctx, request);
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final RegisterMessageFilterClassRequestHeader requestHeader =
+ (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class);
+
+ try {
+ boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getClassName(),
+ requestHeader.getClassCRC(),
+ request.getBody());
+ if (!ok) {
+ throw new Exception("registerFilterClass error");
+ }
+ } catch (Exception e) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(RemotingHelper.exceptionSimpleDesc(e));
+ return response;
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
+ final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+ final PullMessageRequestHeader requestHeader =
+ (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
+
+ final FilterContext filterContext = new FilterContext();
+ filterContext.setConsumerGroup(requestHeader.getConsumerGroup());
+
+
+ response.setOpaque(request.getOpaque());
+
+ DefaultMQPullConsumer pullConsumer = this.filtersrvController.getDefaultMQPullConsumer();
+ final FilterClassInfo findFilterClass =
+ this.filtersrvController.getFilterClassManager()
+ .findFilterClass(requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ if (null == findFilterClass) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Find Filter class failed, not registered");
+ return response;
+ }
+
+ if (null == findFilterClass.getMessageFilter()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Find Filter class failed, registered but no class");
+ return response;
+ }
+
+ responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
+
+
+ MessageQueue mq = new MessageQueue();
+ mq.setTopic(requestHeader.getTopic());
+ mq.setQueueId(requestHeader.getQueueId());
+ mq.setBrokerName(this.filtersrvController.getBrokerName());
+ long offset = requestHeader.getQueueOffset();
+ int maxNums = requestHeader.getMaxMsgNums();
+
+ final PullCallback pullCallback = new PullCallback() {
+
+ @Override
+ public void onSuccess(PullResult pullResult) {
+ responseHeader.setMaxOffset(pullResult.getMaxOffset());
+ responseHeader.setMinOffset(pullResult.getMinOffset());
+ responseHeader.setNextBeginOffset(pullResult.getNextBeginOffset());
+ response.setRemark(null);
+
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ response.setCode(ResponseCode.SUCCESS);
+
+ List<MessageExt> msgListOK = new ArrayList<MessageExt>();
+ try {
+ for (MessageExt msg : pullResult.getMsgFoundList()) {
+ boolean match = findFilterClass.getMessageFilter().match(msg, filterContext);
+ if (match) {
+ msgListOK.add(msg);
+ }
+ }
+
+
+ if (!msgListOK.isEmpty()) {
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, msgListOK);
+ return;
+ } else {
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ }
+ } catch (Throwable e) {
+ final String error =
+ String.format("do Message Filter Exception, ConsumerGroup: %s Topic: %s ",
+ requestHeader.getConsumerGroup(), requestHeader.getTopic());
+ log.error(error, e);
+
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(error + RemotingHelper.exceptionSimpleDesc(e));
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ return;
+ }
+
+ break;
+ case NO_MATCHED_MSG:
+ response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
+ break;
+ case NO_NEW_MSG:
+ response.setCode(ResponseCode.PULL_NOT_FOUND);
+ break;
+ case OFFSET_ILLEGAL:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ break;
+ default:
+ break;
+ }
+
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ }
+
+
+ @Override
+ public void onException(Throwable e) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Pull Callback Exception, " + RemotingHelper.exceptionSimpleDesc(e));
+ returnResponse(requestHeader.getConsumerGroup(), requestHeader.getTopic(), ctx, response, null);
+ return;
+ }
+ };
+
+ pullConsumer.pullBlockIfNotFound(mq, null, offset, maxNums, pullCallback);
+
+ return null;
+ }
+
+ private void returnResponse(final String group, final String topic, ChannelHandlerContext ctx, final RemotingCommand response,
+ final List<MessageExt> msgList) {
+ if (null != msgList) {
+ ByteBuffer[] msgBufferList = new ByteBuffer[msgList.size()];
+ int bodyTotalSize = 0;
+ for (int i = 0; i < msgList.size(); i++) {
+ try {
+ msgBufferList[i] = messageToByteBuffer(msgList.get(i));
+ bodyTotalSize += msgBufferList[i].capacity();
+ } catch (Exception e) {
+ log.error("messageToByteBuffer UnsupportedEncodingException", e);
+ }
+ }
+
+ ByteBuffer body = ByteBuffer.allocate(bodyTotalSize);
+ for (ByteBuffer bb : msgBufferList) {
+ bb.flip();
+ body.put(bb);
+ }
+
+ response.setBody(body.array());
+
+
+ this.filtersrvController.getFilterServerStatsManager().incGroupGetNums(group, topic, msgList.size());
+
+ this.filtersrvController.getFilterServerStatsManager().incGroupGetSize(group, topic, bodyTotalSize);
+ }
+
+ try {
+ ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.error("FilterServer response to " + future.channel().remoteAddress() + " failed", future.cause());
+ log.error(response.toString());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("FilterServer process request over, but response failed", e);
+ log.error(response.toString());
+ }
+ }
+
+ private ByteBuffer messageToByteBuffer(final MessageExt msg) throws IOException {
+ int sysFlag = MessageSysFlag.clearCompressedFlag(msg.getSysFlag());
+ if (msg.getBody() != null) {
+ if (msg.getBody().length >= this.filtersrvController.getFiltersrvConfig().getCompressMsgBodyOverHowmuch()) {
+ byte[] data = UtilAll.compress(msg.getBody(), this.filtersrvController.getFiltersrvConfig().getZipCompressLevel());
+ if (data != null) {
+ msg.setBody(data);
+ sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
+ }
+ }
+ }
+
+ final int bodyLength = msg.getBody() != null ? msg.getBody().length : 0;
+ byte[] topicData = msg.getTopic().getBytes(MixAll.DEFAULT_CHARSET);
+ final int topicLength = topicData.length;
+ String properties = MessageDecoder.messageProperties2String(msg.getProperties());
+ byte[] propertiesData = properties.getBytes(MixAll.DEFAULT_CHARSET);
+ final int propertiesLength = propertiesData.length;
+ final int msgLen = 4 // 1 TOTALSIZE
+ + 4 // 2 MAGICCODE
+ + 4 // 3 BODYCRC
+ + 4 // 4 QUEUEID
+ + 4 // 5 FLAG
+ + 8 // 6 QUEUEOFFSET
+ + 8 // 7 PHYSICALOFFSET
+ + 4 // 8 SYSFLAG
+ + 8 // 9 BORNTIMESTAMP
+ + 8 // 10 BORNHOST
+ + 8 // 11 STORETIMESTAMP
+ + 8 // 12 STOREHOSTADDRESS
+ + 4 // 13 RECONSUMETIMES
+ + 8 // 14 Prepared Transaction Offset
+ + 4 + bodyLength // 14 BODY
+ + 1 + topicLength // 15 TOPIC
+ + 2 + propertiesLength // 16 propertiesLength
+ + 0;
+
+ ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
+ final MessageExt msgInner = msg;
+
+ // 1 TOTALSIZE
+ msgStoreItemMemory.putInt(msgLen);
+ // 2 MAGICCODE
+ msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
+ // 3 BODYCRC
+ msgStoreItemMemory.putInt(UtilAll.crc32(msgInner.getBody()));
+ // 4 QUEUEID
+ msgStoreItemMemory.putInt(msgInner.getQueueId());
+ // 5 FLAG
+ msgStoreItemMemory.putInt(msgInner.getFlag());
+ // 6 QUEUEOFFSET
+ msgStoreItemMemory.putLong(msgInner.getQueueOffset());
+ // 7 PHYSICALOFFSET
+ msgStoreItemMemory.putLong(msgInner.getCommitLogOffset());
+ // 8 SYSFLAG
+ msgStoreItemMemory.putInt(sysFlag);
+ // 9 BORNTIMESTAMP
+ msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+ // 10 BORNHOST
+ msgStoreItemMemory.put(msgInner.getBornHostBytes());
+ // 11 STORETIMESTAMP
+ msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+ // 12 STOREHOSTADDRESS
+ msgStoreItemMemory.put(msgInner.getStoreHostBytes());
+ // 13 RECONSUMETIMES
+ msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+ // 14 Prepared Transaction Offset
+ msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+ // 15 BODY
+ msgStoreItemMemory.putInt(bodyLength);
+ if (bodyLength > 0)
+ msgStoreItemMemory.put(msgInner.getBody());
+ // 16 TOPIC
+ msgStoreItemMemory.put((byte) topicLength);
+ msgStoreItemMemory.put(topicData);
+ // 17 PROPERTIES
+ msgStoreItemMemory.putShort((short) propertiesLength);
+ if (propertiesLength > 0)
+ msgStoreItemMemory.put(propertiesData);
+
+ return msgStoreItemMemory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
new file mode 100644
index 0000000..3921c92
--- /dev/null
+++ b/rocketmq-filtersrv/src/main/java/com/alibaba/rocketmq/filtersrv/stats/FilterServerStatsManager.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.filtersrv.stats;
+
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class FilterServerStatsManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.FILTERSRV_LOGGER_NAME);
+ private final ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSStatsThread"));
+
+ // ConsumerGroup Get Nums
+ private final StatsItemSet groupGetNums = new StatsItemSet("GROUP_GET_NUMS",
+ this.scheduledExecutorService, log);
+
+ // ConsumerGroup Get Size
+ private final StatsItemSet groupGetSize = new StatsItemSet("GROUP_GET_SIZE",
+ this.scheduledExecutorService, log);
+
+
+ public FilterServerStatsManager() {
+ }
+
+
+ public void start() {
+ }
+
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+
+ public void incGroupGetNums(final String group, final String topic, final int incValue) {
+ this.groupGetNums.addValue(topic + "@" + group, incValue, 1);
+ }
+
+
+ public void incGroupGetSize(final String group, final String topic, final int incValue) {
+ this.groupGetSize.addValue(topic + "@" + group, incValue, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/pom.xml b/rocketmq-namesrv/pom.xml
new file mode 100644
index 0000000..3494f8f
--- /dev/null
+++ b/rocketmq-namesrv/pom.xml
@@ -0,0 +1,58 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain producerGroup copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-all</artifactId>
+ <version>4.0.0-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <packaging>jar</packaging>
+ <artifactId>rocketmq-namesrv</artifactId>
+ <name>rocketmq-namesrv ${project.version}</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-srvutil</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
new file mode 100644
index 0000000..82f2622
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvController.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv;
+
+import com.alibaba.rocketmq.common.Configuration;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.namesrv.NamesrvConfig;
+import com.alibaba.rocketmq.namesrv.kvconfig.KVConfigManager;
+import com.alibaba.rocketmq.namesrv.processor.ClusterTestRequestProcessor;
+import com.alibaba.rocketmq.namesrv.processor.DefaultRequestProcessor;
+import com.alibaba.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
+import com.alibaba.rocketmq.namesrv.routeinfo.RouteInfoManager;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyRemotingServer;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvController {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+ private final NamesrvConfig namesrvConfig;
+
+ private final NettyServerConfig nettyServerConfig;
+
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "NSScheduledThread"));
+ private final KVConfigManager kvConfigManager;
+ private final RouteInfoManager routeInfoManager;
+
+ private RemotingServer remotingServer;
+
+ private BrokerHousekeepingService brokerHousekeepingService;
+
+ private ExecutorService remotingExecutor;
+
+ private Configuration configuration;
+
+
+ public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
+ this.namesrvConfig = namesrvConfig;
+ this.nettyServerConfig = nettyServerConfig;
+ this.kvConfigManager = new KVConfigManager(this);
+ this.routeInfoManager = new RouteInfoManager();
+ this.brokerHousekeepingService = new BrokerHousekeepingService(this);
+ this.configuration = new Configuration(
+ log,
+ this.namesrvConfig, this.nettyServerConfig
+ );
+ this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
+ }
+
+
+ public boolean initialize() {
+
+ this.kvConfigManager.load();
+
+ this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
+
+
+ this.remotingExecutor =
+ Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
+
+ this.registerProcessor();
+
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ NamesrvController.this.routeInfoManager.scanNotActiveBroker();
+ }
+ }, 5, 10, TimeUnit.SECONDS);
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+ @Override
+ public void run() {
+ NamesrvController.this.kvConfigManager.printAllPeriodically();
+ }
+ }, 1, 10, TimeUnit.MINUTES);
+
+ return true;
+ }
+
+
+ private void registerProcessor() {
+ if (namesrvConfig.isClusterTest()) {
+
+ this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
+ this.remotingExecutor);
+ } else {
+
+ this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
+ }
+ }
+
+
+ public void start() throws Exception {
+ this.remotingServer.start();
+ }
+
+
+ public void shutdown() {
+ this.remotingServer.shutdown();
+ this.remotingExecutor.shutdown();
+ this.scheduledExecutorService.shutdown();
+ }
+
+
+ public NamesrvConfig getNamesrvConfig() {
+ return namesrvConfig;
+ }
+
+
+ public NettyServerConfig getNettyServerConfig() {
+ return nettyServerConfig;
+ }
+
+
+ public KVConfigManager getKvConfigManager() {
+ return kvConfigManager;
+ }
+
+
+ public RouteInfoManager getRouteInfoManager() {
+ return routeInfoManager;
+ }
+
+
+ public RemotingServer getRemotingServer() {
+ return remotingServer;
+ }
+
+
+ public void setRemotingServer(RemotingServer remotingServer) {
+ this.remotingServer = remotingServer;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
new file mode 100644
index 0000000..286de3a
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/NamesrvStartup.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.namesrv.NamesrvConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettySystemConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NamesrvStartup {
+ public static Properties properties = null;
+ public static CommandLine commandLine = null;
+
+ public static void main(String[] args) {
+ main0(args);
+ }
+
+ public static NamesrvController main0(String[] args) {
+ System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+ NettySystemConfig.socketSndbufSize = 4096;
+ }
+
+
+ if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+ NettySystemConfig.socketRcvbufSize = 4096;
+ }
+
+ try {
+ //PackageConflictDetect.detectFastjson();
+
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ commandLine =
+ ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options),
+ new PosixParser());
+ if (null == commandLine) {
+ System.exit(-1);
+ return null;
+ }
+
+
+ final NamesrvConfig namesrvConfig = new NamesrvConfig();
+ final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+ nettyServerConfig.setListenPort(9876);
+ if (commandLine.hasOption('c')) {
+ String file = commandLine.getOptionValue('c');
+ if (file != null) {
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
+ properties = new Properties();
+ properties.load(in);
+ MixAll.properties2Object(properties, namesrvConfig);
+ MixAll.properties2Object(properties, nettyServerConfig);
+
+ namesrvConfig.setConfigStorePath(file);
+
+ System.out.printf("load config properties file OK, " + file + "%n");
+ in.close();
+ }
+ }
+
+
+ if (commandLine.hasOption('p')) {
+ MixAll.printObjectProperties(null, namesrvConfig);
+ MixAll.printObjectProperties(null, nettyServerConfig);
+ System.exit(0);
+ }
+
+ MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
+
+ if (null == namesrvConfig.getRocketmqHome()) {
+ System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+ + " variable in your environment to match the location of the RocketMQ installation%n");
+ System.exit(-2);
+ }
+
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
+ final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+
+ MixAll.printObjectProperties(log, namesrvConfig);
+ MixAll.printObjectProperties(log, nettyServerConfig);
+
+
+ final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
+
+ // remember all configs to prevent discard
+ controller.getConfiguration().registerConfig(properties);
+
+ boolean initResult = controller.initialize();
+ if (!initResult) {
+ controller.shutdown();
+ System.exit(-3);
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ private volatile boolean hasShutdown = false;
+ private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+ if (!this.hasShutdown) {
+ this.hasShutdown = true;
+ long begineTime = System.currentTimeMillis();
+ controller.shutdown();
+ long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+ log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+ }
+ }
+ }
+ }, "ShutdownHook"));
+
+
+ controller.start();
+
+ String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ log.info(tip);
+ System.out.printf(tip + "%n");
+
+ return controller;
+ } catch (Throwable e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ return null;
+ }
+
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("c", "configFile", true, "Name server config properties file");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("p", "printConfigItem", false, "Print all config item");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
new file mode 100644
index 0000000..a83586c
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.kvconfig;
+
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.body.KVTable;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class KVConfigManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+ private final NamesrvController namesrvController;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =
+ new HashMap<String, HashMap<String, String>>();
+
+
+ public KVConfigManager(NamesrvController namesrvController) {
+ this.namesrvController = namesrvController;
+ }
+
+
+ public void load() {
+ String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+ if (content != null) {
+ KVConfigSerializeWrapper kvConfigSerializeWrapper =
+ KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);
+ if (null != kvConfigSerializeWrapper) {
+ this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());
+ log.info("load KV config table OK");
+ }
+ }
+ }
+
+
+ public void putKVConfig(final String namespace, final String key, final String value) {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ try {
+ HashMap<String, String> kvTable = this.configTable.get(namespace);
+ if (null == kvTable) {
+ kvTable = new HashMap<String, String>();
+ this.configTable.put(namespace, kvTable);
+ log.info("putKVConfig create new Namespace {}", namespace);
+ }
+
+ final String prev = kvTable.put(key, value);
+ if (null != prev) {
+ log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", //
+ namespace, key, value);
+ } else {
+ log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", //
+ namespace, key, value);
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putKVConfig InterruptedException", e);
+ }
+
+ this.persist();
+ }
+
+ public void persist() {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ try {
+ KVConfigSerializeWrapper kvConfigSerializeWrapper = new KVConfigSerializeWrapper();
+ kvConfigSerializeWrapper.setConfigTable(this.configTable);
+
+ String content = kvConfigSerializeWrapper.toJson();
+
+ if (null != content) {
+ MixAll.string2File(content, this.namesrvController.getNamesrvConfig().getKvConfigPath());
+ }
+ } catch (IOException e) {
+ log.error("persist kvconfig Exception, "
+ + this.namesrvController.getNamesrvConfig().getKvConfigPath(), e);
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("persist InterruptedException", e);
+ }
+
+ }
+
+ public void deleteKVConfig(final String namespace, final String key) {
+ try {
+ this.lock.writeLock().lockInterruptibly();
+ try {
+ HashMap<String, String> kvTable = this.configTable.get(namespace);
+ if (null != kvTable) {
+ String value = kvTable.remove(key);
+ log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", //
+ namespace, key, value);
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("deleteKVConfig InterruptedException", e);
+ }
+
+ this.persist();
+ }
+
+ public byte[] getKVListByNamespace(final String namespace) {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ try {
+ HashMap<String, String> kvTable = this.configTable.get(namespace);
+ if (null != kvTable) {
+ KVTable table = new KVTable();
+ table.setTable(kvTable);
+ return table.encode();
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getKVListByNamespace InterruptedException", e);
+ }
+
+ return null;
+ }
+
+ public String getKVConfig(final String namespace, final String key) {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ try {
+ HashMap<String, String> kvTable = this.configTable.get(namespace);
+ if (null != kvTable) {
+ return kvTable.get(key);
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getKVConfig InterruptedException", e);
+ }
+
+ return null;
+ }
+
+ public void printAllPeriodically() {
+ try {
+ this.lock.readLock().lockInterruptibly();
+ try {
+ log.info("--------------------------------------------------------");
+
+ {
+ log.info("configTable SIZE: {}", this.configTable.size());
+ Iterator<Entry<String, HashMap<String, String>>> it =
+ this.configTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, HashMap<String, String>> next = it.next();
+ Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();
+ while (itSub.hasNext()) {
+ Entry<String, String> nextSub = itSub.next();
+ log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),
+ nextSub.getValue());
+ }
+ }
+ }
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("printAllPeriodically InterruptedException", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
new file mode 100644
index 0000000..3a91028
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/kvconfig/KVConfigSerializeWrapper.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.kvconfig;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.HashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class KVConfigSerializeWrapper extends RemotingSerializable {
+ private HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable;
+
+
+ public HashMap<String, HashMap<String, String>> getConfigTable() {
+ return configTable;
+ }
+
+
+ public void setConfigTable(HashMap<String, HashMap<String, String>> configTable) {
+ this.configTable = configTable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
new file mode 100644
index 0000000..b0b158d
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.processor;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.tools.admin.DefaultMQAdminExt;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author manhong.yqd
+ */
+public class ClusterTestRequestProcessor extends DefaultRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private final DefaultMQAdminExt adminExt;
+ private final String productEnvName;
+
+
+ public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) {
+ super(namesrvController);
+ this.productEnvName = productEnvName;
+ adminExt = new DefaultMQAdminExt();
+ adminExt.setInstanceName("CLUSTER_TEST_NS_INS_" + productEnvName);
+ adminExt.setUnitName(productEnvName);
+ try {
+ adminExt.start();
+ } catch (MQClientException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ @Override
+ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetRouteInfoRequestHeader requestHeader =
+ (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+
+ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+ if (topicRouteData != null) {
+ String orderTopicConf =
+ this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+ requestHeader.getTopic());
+ topicRouteData.setOrderTopicConf(orderTopicConf);
+ } else {
+ try {
+ topicRouteData = adminExt.examineTopicRouteInfo(requestHeader.getTopic());
+ } catch (Exception e) {
+ log.info("get route info by topic from product environment failed. envName={},", productEnvName);
+ }
+ }
+
+ if (topicRouteData != null) {
+ byte[] content = topicRouteData.encode();
+ response.setBody(content);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ return response;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
new file mode 100644
index 0000000..118198e
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.processor;
+
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MQVersion.Version;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import com.alibaba.rocketmq.common.namesrv.NamesrvUtil;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.RegisterBrokerBody;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.namesrv.*;
+import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
+import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultRequestProcessor implements NettyRequestProcessor {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+
+ protected final NamesrvController namesrvController;
+
+
+ public DefaultRequestProcessor(NamesrvController namesrvController) {
+ this.namesrvController = namesrvController;
+ }
+
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ if (log.isDebugEnabled()) {
+ log.debug("receive request, {} {} {}",
+ request.getCode(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ request);
+ }
+
+ switch (request.getCode()) {
+ case RequestCode.PUT_KV_CONFIG:
+ return this.putKVConfig(ctx, request);
+ case RequestCode.GET_KV_CONFIG:
+ return this.getKVConfig(ctx, request);
+ case RequestCode.DELETE_KV_CONFIG:
+ return this.deleteKVConfig(ctx, request);
+ case RequestCode.REGISTER_BROKER:
+ Version brokerVersion = MQVersion.value2Version(request.getVersion());
+ if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
+ return this.registerBrokerWithFilterServer(ctx, request);
+ }
+ else {
+ return this.registerBroker(ctx, request);
+ }
+ case RequestCode.UNREGISTER_BROKER:
+ return this.unregisterBroker(ctx, request);
+ case RequestCode.GET_ROUTEINTO_BY_TOPIC:
+ return this.getRouteInfoByTopic(ctx, request);
+ case RequestCode.GET_BROKER_CLUSTER_INFO:
+ return this.getBrokerClusterInfo(ctx, request);
+ case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
+ return this.wipeWritePermOfBroker(ctx, request);
+ case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
+ return getAllTopicListFromNameserver(ctx, request);
+ case RequestCode.DELETE_TOPIC_IN_NAMESRV:
+ return deleteTopicInNamesrv(ctx, request);
+ case RequestCode.GET_KVLIST_BY_NAMESPACE:
+ return this.getKVListByNamespace(ctx, request);
+ case RequestCode.GET_TOPICS_BY_CLUSTER:
+ return this.getTopicsByCluster(ctx, request);
+ case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
+ return this.getSystemTopicListFromNs(ctx, request);
+ case RequestCode.GET_UNIT_TOPIC_LIST:
+ return this.getUnitTopicList(ctx, request);
+ case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
+ return this.getHasUnitSubTopicList(ctx, request);
+ case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
+ return this.getHasUnitSubUnUnitTopicList(ctx, request);
+ case RequestCode.UPDATE_NAMESRV_CONFIG:
+ return this.updateConfig(ctx, request);
+ case RequestCode.GET_NAMESRV_CONFIG:
+ return this.getConfig(ctx, request);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ @Override
+ public boolean rejectRequest() {
+ return false;
+ }
+
+ public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final PutKVConfigRequestHeader requestHeader =
+ (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class);
+
+ this.namesrvController.getKvConfigManager().putKVConfig(
+ requestHeader.getNamespace(),
+ requestHeader.getKey(),
+ requestHeader.getValue()
+ );
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class);
+ final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader();
+ final GetKVConfigRequestHeader requestHeader =
+ (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class);
+
+ String value = this.namesrvController.getKvConfigManager().getKVConfig(
+ requestHeader.getNamespace(),
+ requestHeader.getKey()
+ );
+
+ if (value != null) {
+ responseHeader.setValue(value);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ response.setCode(ResponseCode.QUERY_NOT_FOUND);
+ response.setRemark("No config item, Namespace: " + requestHeader.getNamespace() + " Key: " + requestHeader.getKey());
+ return response;
+ }
+
+ public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final DeleteKVConfigRequestHeader requestHeader =
+ (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class);
+
+ this.namesrvController.getKvConfigManager().deleteKVConfig(
+ requestHeader.getNamespace(),
+ requestHeader.getKey()
+ );
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ final RegisterBrokerRequestHeader requestHeader =
+ (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+
+ if (request.getBody() != null) {
+ registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), RegisterBrokerBody.class);
+ } else {
+ registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
+ registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestatmp(0);
+ }
+
+ RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId(),
+ requestHeader.getHaServerAddr(),
+ registerBrokerBody.getTopicConfigSerializeWrapper(),
+ registerBrokerBody.getFilterServerList(),
+ ctx.channel());
+
+ responseHeader.setHaServerAddr(result.getHaServerAddr());
+ responseHeader.setMasterAddr(result.getMasterAddr());
+
+
+ byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ response.setBody(jsonValue);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
+ final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
+ final RegisterBrokerRequestHeader requestHeader =
+ (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
+
+ TopicConfigSerializeWrapper topicConfigWrapper;
+ if (request.getBody() != null) {
+ topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
+ } else {
+ topicConfigWrapper = new TopicConfigSerializeWrapper();
+ topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
+ topicConfigWrapper.getDataVersion().setTimestatmp(0);
+ }
+
+ RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId(),
+ requestHeader.getHaServerAddr(),
+ topicConfigWrapper,
+ null,
+ ctx.channel()
+ );
+
+ responseHeader.setHaServerAddr(result.getHaServerAddr());
+ responseHeader.setMasterAddr(result.getMasterAddr());
+
+
+ byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ response.setBody(jsonValue);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final UnRegisterBrokerRequestHeader requestHeader =
+ (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class);
+
+ this.namesrvController.getRouteInfoManager().unregisterBroker(
+ requestHeader.getClusterName(),
+ requestHeader.getBrokerAddr(),
+ requestHeader.getBrokerName(),
+ requestHeader.getBrokerId());
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetRouteInfoRequestHeader requestHeader =
+ (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
+
+ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
+
+ if (topicRouteData != null) {
+ if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
+ String orderTopicConf =
+ this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
+ requestHeader.getTopic());
+ topicRouteData.setOrderTopicConf(orderTopicConf);
+ }
+
+ byte[] content = topicRouteData.encode();
+ response.setBody(content);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ return response;
+ }
+
+ private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();
+ response.setBody(content);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class);
+ final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader();
+ final WipeWritePermOfBrokerRequestHeader requestHeader =
+ (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class);
+
+ int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName());
+
+ log.info("wipe write perm of broker[{}], client: {}, {}",
+ requestHeader.getBrokerName(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ wipeTopicCnt);
+
+ responseHeader.setWipeTopicCount(wipeTopicCnt);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final DeleteTopicInNamesrvRequestHeader requestHeader =
+ (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
+
+ this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetKVListByNamespaceRequestHeader requestHeader =
+ (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class);
+
+ byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(
+ requestHeader.getNamespace());
+ if (null != jsonValue) {
+ response.setBody(jsonValue);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ response.setCode(ResponseCode.QUERY_NOT_FOUND);
+ response.setRemark("No config item, Namespace: " + requestHeader.getNamespace());
+ return response;
+ }
+
+ private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ final GetTopicsByClusterRequestHeader requestHeader =
+ (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+
+ private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+
+ private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+
+ private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+
+ private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();
+
+ response.setBody(body);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ byte[] body = request.getBody();
+ if (body != null) {
+ String bodyStr;
+ try {
+ bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ log.error("updateConfig byte array to string error: ", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+
+ if (bodyStr == null) {
+ log.error("updateConfig get null body!");
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string2Properties error");
+ return response;
+ }
+
+ Properties properties = MixAll.string2Properties(bodyStr);
+ if (properties == null) {
+ log.error("updateConfig MixAll.string2Properties error {}", bodyStr);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("string2Properties error");
+ return response;
+ }
+
+ this.namesrvController.getConfiguration().update(properties);
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+ private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand request) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ String content = this.namesrvController.getConfiguration().getAllConfigsFormatString();
+ if (content != null && content.length() > 0) {
+ try {
+ response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
+ } catch (UnsupportedEncodingException e) {
+ log.error("getConfig error, ", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("UnsupportedEncodingException " + e);
+ return response;
+ }
+ }
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
----------------------------------------------------------------------
diff --git a/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
new file mode 100644
index 0000000..2f123fb
--- /dev/null
+++ b/rocketmq-namesrv/src/main/java/com/alibaba/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.namesrv.routeinfo;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.namesrv.NamesrvController;
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerHousekeepingService implements ChannelEventListener {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
+ private final NamesrvController namesrvController;
+
+
+ public BrokerHousekeepingService(NamesrvController namesrvController) {
+ this.namesrvController = namesrvController;
+ }
+
+
+ @Override
+ public void onChannelConnect(String remoteAddr, Channel channel) {
+ }
+
+
+ @Override
+ public void onChannelClose(String remoteAddr, Channel channel) {
+ this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+ }
+
+
+ @Override
+ public void onChannelException(String remoteAddr, Channel channel) {
+ this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+ }
+
+
+ @Override
+ public void onChannelIdle(String remoteAddr, Channel channel) {
+ this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
+ }
+}