You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:53 UTC
[28/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index 146770a..2971b6c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -6,38 +6,34 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.pagecache;
-import org.apache.rocketmq.store.QueryMessageResult;
import io.netty.channel.FileRegion;
import io.netty.util.AbstractReferenceCounted;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.List;
-
+import org.apache.rocketmq.store.QueryMessageResult;
public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
private final ByteBuffer byteBufferHeader;
private final QueryMessageResult queryMessageResult;
private long transfered; // the bytes which was transfered already
-
public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
this.byteBufferHeader = byteBufferHeader;
this.queryMessageResult = queryMessageResult;
}
-
@Override
public long position() {
int pos = byteBufferHeader.position();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 601e2f3..bac941d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/**
@@ -20,12 +20,16 @@
*/
package org.apache.rocketmq.broker.plugin;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.store.*;
-
import java.util.HashMap;
import java.util.Set;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
public abstract class AbstractPluginMessageStore implements MessageStore {
protected MessageStore next = null;
@@ -83,7 +87,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
@Override
public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
- int maxMsgNums, SubscriptionData subscriptionData) {
+ int maxMsgNums, SubscriptionData subscriptionData) {
return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
}
@@ -174,7 +178,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
- long end) {
+ long end) {
return next.queryMessage(topic, key, maxNum, begin, end);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
index 42793ae..294bf8c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/**
@@ -20,14 +20,13 @@
*/
package org.apache.rocketmq.broker.plugin;
-import org.apache.rocketmq.store.MessageStore;
-
import java.io.IOException;
import java.lang.reflect.Constructor;
+import org.apache.rocketmq.store.MessageStore;
public final class MessageStoreFactory {
public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
- throws IOException {
+ throws IOException {
String plugin = context.getBrokerConfig().getMessageStorePlugIn();
if (plugin != null && plugin.trim().length() != 0) {
String[] pluginClasses = plugin.split(",");
@@ -35,12 +34,12 @@ public final class MessageStoreFactory {
String pluginClass = pluginClasses[i];
try {
@SuppressWarnings("unchecked")
- Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+ Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass);
Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
messageStore = construct.newInstance(context, messageStore);
} catch (Throwable e) {
throw new RuntimeException(String.format(
- "Initialize plugin's class %s not found!", pluginClass), e);
+ "Initialize plugin's class %s not found!", pluginClass), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
index 32af402..fcab1e6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
/**
@@ -32,8 +32,8 @@ public class MessageStorePluginContext {
private BrokerConfig brokerConfig;
public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
- BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
- BrokerConfig brokerConfig) {
+ BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
+ BrokerConfig brokerConfig) {
super();
this.messageStoreConfig = messageStoreConfig;
this.brokerStatsManager = brokerStatsManager;
@@ -57,5 +57,4 @@ public class MessageStorePluginContext {
return brokerConfig;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index f04e86c..75e5766 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -6,16 +6,22 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -42,17 +48,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-
public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -62,16 +60,15 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
protected final SocketAddress storeHost;
private List<SendMessageHook> sendMessageHookList;
-
public AbstractSendMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.storeHost =
- new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
- .getNettyServerConfig().getListenPort());
+ new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
+ .getNettyServerConfig().getListenPort());
}
protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
- SendMessageRequestHeader requestHeader) {
+ SendMessageRequestHeader requestHeader) {
if (!this.hasSendMessageHook()) {
return null;
}
@@ -91,7 +88,6 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
-
if (uniqueKey == null) {
uniqueKey = "";
}
@@ -104,7 +100,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
+ final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
int queueIdInt = requestHeader.getQueueId();
if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
@@ -120,10 +116,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner,
- MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
- msgInner.getTags()));
+ msgInner.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(sysFlag);
@@ -131,7 +127,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
- .getReconsumeTimes());
+ .getReconsumeTimes());
return msgInner;
}
@@ -140,8 +136,8 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, RemotingCommand request,
- final RemotingCommand response) {
+ final SendMessageRequestHeader requestHeader, RemotingCommand request,
+ final RemotingCommand response) {
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
@@ -149,13 +145,13 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long "
- + requestHeader.getProperties().length());
+ + requestHeader.getProperties().length());
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
return response;
}
if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
log.warn(" topic {} msg body size {} from {}", requestHeader.getTopic(),
- request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
+ request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
response.setRemark("msg body must be less 64KB");
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
return response;
@@ -164,12 +160,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
- final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+ final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
- && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
+ && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
- + "] sending message is forbidden");
+ + "] sending message is forbidden");
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
@@ -181,7 +177,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
TopicConfig topicConfig =
- this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+ this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
@@ -193,26 +189,26 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
- + ctx.channel().remoteAddress());
+ + ctx.channel().remoteAddress());
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
- requestHeader.getTopic(), //
- requestHeader.getDefaultTopic(), //
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
- requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
+ requestHeader.getTopic(), //
+ requestHeader.getDefaultTopic(), //
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+ requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =
- this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
- requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
- topicSysFlag);
+ this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+ requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
+ topicSysFlag);
}
}
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
- + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+ + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
@@ -221,9 +217,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) {
String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s",
- queueIdInt,
- topicConfig.toString(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ queueIdInt,
+ topicConfig.toString(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -239,7 +235,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
- final RemotingCommand response) {
+ final RemotingCommand response) {
if (!request.isOnewayRPC()) {
try {
ctx.writeAndFlush(response);
@@ -252,7 +248,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
- SendMessageContext context) {
+ SendMessageContext context) {
if (hasSendMessageHook()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
@@ -280,20 +276,20 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
SendMessageRequestHeaderV2 requestHeaderV2 = null;
SendMessageRequestHeader requestHeader = null;
switch (request.getCode()) {
case RequestCode.SEND_MESSAGE_V2:
requestHeaderV2 =
- (SendMessageRequestHeaderV2) request
- .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ (SendMessageRequestHeaderV2)request
+ .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
case RequestCode.SEND_MESSAGE:
if (null == requestHeaderV2) {
requestHeader =
- (SendMessageRequestHeader) request
- .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ (SendMessageRequestHeader)request
+ .decodeCommandCustomHeader(SendMessageRequestHeader.class);
} else {
requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
}
@@ -309,7 +305,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
try {
if (response != null) {
final SendMessageResponseHeader responseHeader =
- (SendMessageResponseHeader) response.readCustomHeader();
+ (SendMessageResponseHeader)response.readCustomHeader();
context.setMsgId(responseHeader.getMsgId());
context.setQueueId(responseHeader.getQueueId());
context.setQueueOffset(responseHeader.getQueueOffset());
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d2d4bc7..722bec2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,6 +16,19 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -33,8 +46,48 @@ import org.apache.rocketmq.common.message.MessageId;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
-import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
@@ -50,17 +103,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -157,10 +202,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
- (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+ (CreateTopicRequestHeader)request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
@@ -193,13 +237,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
- (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
+ (DeleteTopicRequestHeader)request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getMessageStore()
- .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+ .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -274,7 +318,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
- final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader();
+ final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader)response.readCustomHeader();
String content = this.brokerController.getConfiguration().getAllConfigsFormatString();
if (content != null && content.length() > 0) {
@@ -298,12 +342,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
- final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+ final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader)response.readCustomHeader();
final SearchOffsetRequestHeader requestHeader =
- (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+ (SearchOffsetRequestHeader)request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
- requestHeader.getTimestamp());
+ requestHeader.getTimestamp());
responseHeader.setOffset(offset);
@@ -314,9 +358,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
- final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+ final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader)response.readCustomHeader();
final GetMaxOffsetRequestHeader requestHeader =
- (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
+ (GetMaxOffsetRequestHeader)request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
@@ -329,9 +373,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
- final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+ final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader)response.readCustomHeader();
final GetMinOffsetRequestHeader requestHeader =
- (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+ (GetMinOffsetRequestHeader)request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
@@ -343,12 +387,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
- final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+ final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader)response.readCustomHeader();
final GetEarliestMsgStoretimeRequestHeader requestHeader =
- (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
+ (GetEarliestMsgStoretimeRequestHeader)request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
long timestamp =
- this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
+ this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
responseHeader.setTimestamp(timestamp);
response.setCode(ResponseCode.SUCCESS);
@@ -375,9 +419,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(//
- requestBody.getConsumerGroup(), //
- requestBody.getMqSet(), //
- requestBody.getClientId());
+ requestBody.getConsumerGroup(), //
+ requestBody.getMqSet(), //
+ requestBody.getClientId());
LockBatchResponseBody responseBody = new LockBatchResponseBody();
responseBody.setLockOKMQSet(lockOKMQSet);
@@ -393,9 +437,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
this.brokerController.getRebalanceLockManager().unlockBatch(//
- requestBody.getConsumerGroup(), //
- requestBody.getMqSet(), //
- requestBody.getClientId());
+ requestBody.getConsumerGroup(), //
+ requestBody.getMqSet(), //
+ requestBody.getClientId());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -403,7 +447,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -447,7 +491,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteSubscriptionGroupRequestHeader requestHeader =
- (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
+ (DeleteSubscriptionGroupRequestHeader)request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -461,7 +505,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetTopicStatsInfoRequestHeader requestHeader =
- (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
+ (GetTopicStatsInfoRequestHeader)request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
final String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -509,10 +553,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumerConnectionListRequestHeader requestHeader =
- (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
+ (GetConsumerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
ConsumerConnection bodydata = new ConsumerConnection();
bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
@@ -548,11 +592,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetProducerConnectionListRequestHeader requestHeader =
- (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
+ (GetProducerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
ProducerConnection bodydata = new ProducerConnection();
HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
- this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
+ this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
if (channelInfoHashMap != null) {
Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
while (it.hasNext()) {
@@ -581,7 +625,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetConsumeStatsRequestHeader requestHeader =
- (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
+ (GetConsumeStatsRequestHeader)request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
ConsumeStats consumeStats = new ConsumeStats();
@@ -604,10 +648,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
*/
{
SubscriptionData findSubscriptionData =
- this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
+ this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
if (null == findSubscriptionData //
- && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
+ && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
continue;
}
@@ -626,16 +670,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
- requestHeader.getConsumerGroup(), //
- topic, //
- i);
+ requestHeader.getConsumerGroup(), //
+ topic, //
+ i);
if (consumerOffset < 0)
consumerOffset = 0;
offsetWrapper.setBrokerOffset(brokerOffset);
offsetWrapper.setConsumerOffset(consumerOffset);
-
long timeOffset = consumerOffset - 1;
if (timeOffset >= 0) {
long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
@@ -690,7 +733,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
+ String content = ((DefaultMessageStore)this.brokerController.getMessageStore()).getScheduleMessageService().encode();
if (content != null && content.length() > 0) {
try {
response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
@@ -716,10 +759,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =
- (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+ (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getTimestamp(), requestHeader.isForce());
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+ requestHeader.getTimestamp(), requestHeader.isForce());
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
@@ -728,25 +771,24 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
break;
}
return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getTimestamp(), requestHeader.isForce(), isC);
+ requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerStatusRequestHeader requestHeader =
- (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+ (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getClientAddr());
+ requestHeader.getClientAddr());
}
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryTopicConsumeByWhoRequestHeader requestHeader =
- (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
-
+ (QueryTopicConsumeByWhoRequestHeader)request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
@@ -767,9 +809,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
- final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
+ final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader)response.readCustomHeader();
final RegisterFilterServerRequestHeader requestHeader =
- (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
+ (RegisterFilterServerRequestHeader)request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
@@ -784,7 +826,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryConsumeTimeSpanRequestHeader requestHeader =
- (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
+ (QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
final String topic = requestHeader.getTopic();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -812,7 +854,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long consumeTime;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
- requestHeader.getGroup(), topic, i);
+ requestHeader.getGroup(), topic, i);
if (consumerOffset > 0) {
consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
} else {
@@ -837,7 +879,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic();
@@ -874,28 +916,28 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
*/
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
- (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+ (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(),
- requestHeader.getClientId());
+ requestHeader.getClientId());
}
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
QueryCorrectionOffsetHeader requestHeader =
- (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
+ (QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
- .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
+ .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
Map<Integer, Long> compareOffset =
- this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());
+ this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());
if (compareOffset != null && !compareOffset.isEmpty()) {
for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
Integer queueId = entry.getKey();
correctionOffset.put(queueId,
- correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
+ correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
}
}
@@ -908,8 +950,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
- final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
- .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+ final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader)request
+ .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
SelectMappedBufferResult selectMappedBufferResult = null;
@@ -928,13 +970,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),
- requestHeader.getClientId());
+ requestHeader.getClientId());
}
private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CloneGroupOffsetRequestHeader requestHeader =
- (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
+ (CloneGroupOffsetRequestHeader)request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
Set<String> topics;
if (UtilAll.isBlank(requestHeader.getTopic())) {
@@ -957,16 +999,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
- this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
+ this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
- && findSubscriptionData == null) {
+ && findSubscriptionData == null) {
log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
continue;
}
}
this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
- requestHeader.getTopic());
+ requestHeader.getTopic());
}
response.setCode(ResponseCode.SUCCESS);
@@ -976,9 +1018,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final ViewBrokerStatsDataRequestHeader requestHeader =
- (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
+ (ViewBrokerStatsDataRequestHeader)request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
+ DefaultMessageStore messageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
if (null == statsItem) {
@@ -998,7 +1040,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
brokerStatsData.setStatsMinute(it);
}
-
{
BrokerStatsItem it = new BrokerStatsItem();
StatsSnapshot ss = statsItem.getStatsDataInHour();
@@ -1008,7 +1049,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
brokerStatsData.setStatsHour(it);
}
-
{
BrokerStatsItem it = new BrokerStatsItem();
StatsSnapshot ss = statsItem.getStatsDataInDay();
@@ -1025,16 +1065,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
GetConsumeStatsInBrokerHeader requestHeader =
- (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
+ (GetConsumeStatsInBrokerHeader)request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
boolean isOrder = requestHeader.isOrder();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups =
- brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
+ brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =
- new ArrayList<Map<String, List<ConsumeStats>>>();
+ new ArrayList<Map<String, List<ConsumeStats>>>();
long totalDiff = 0L;
@@ -1060,7 +1100,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
if (null == findSubscriptionData //
- && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
+ && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);
continue;
}
@@ -1076,16 +1116,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (brokerOffset < 0)
brokerOffset = 0;
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
- group, //
- topic, //
- i);
+ group, //
+ topic, //
+ i);
if (consumerOffset < 0)
consumerOffset = 0;
offsetWrapper.setBrokerOffset(brokerOffset);
offsetWrapper.setConsumerOffset(consumerOffset);
-
long timeOffset = consumerOffset - 1;
if (timeOffset >= 0) {
long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
@@ -1120,23 +1159,23 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION));
runtimeInfo.put("msgPutTotalYesterdayMorning",
- String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
+ String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
runtimeInfo.put("msgGetTotalYesterdayMorning",
- String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
+ String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
runtimeInfo.put("sendThreadPoolQueueCapacity",
- String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
+ String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size()));
runtimeInfo.put("pullThreadPoolQueueCapacity",
- String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
+ String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
@@ -1146,7 +1185,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
- DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
+ DefaultMessageStore defaultMessageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs()));
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false));
@@ -1163,10 +1202,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
}
private RemotingCommand callConsumer(//
- final int requestCode, //
- final RemotingCommand request, //
- final String consumerGroup, //
- final String clientId) throws RemotingCommandException {
+ final int requestCode, //
+ final RemotingCommand request, //
+ final String consumerGroup, //
+ final String clientId) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
@@ -1179,8 +1218,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
- clientId, //
- MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
+ clientId, //
+ MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
return response;
}
@@ -1193,12 +1232,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
} catch (RemotingTimeoutException e) {
response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
response
- .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+ .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
return response;
} catch (Exception e) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(
- String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+ String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
return response;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 62de995..717afaf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.MixAll;
@@ -34,11 +35,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ClientManageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -49,7 +48,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
@@ -70,16 +69,16 @@ public class ClientManageProcessor implements NettyRequestProcessor {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- ctx.channel(),
- heartbeatData.getClientID(),
- request.getLanguage(),
- request.getVersion()
+ ctx.channel(),
+ heartbeatData.getClientID(),
+ request.getLanguage(),
+ request.getVersion()
);
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
- data.getGroupName());
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
+ data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
@@ -89,32 +88,32 @@ public class ClientManageProcessor implements NettyRequestProcessor {
}
String newTopic = MixAll.getRetryTopic(data.getGroupName());
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
- newTopic,
- subscriptionGroupConfig.getRetryQueueNums(),
- PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+ newTopic,
+ subscriptionGroupConfig.getRetryQueueNums(),
+ PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
- data.getGroupName(),
- clientChannelInfo,
- data.getConsumeType(),
- data.getMessageModel(),
- data.getConsumeFromWhere(),
- data.getSubscriptionDataSet(),
- isNotifyConsumerIdsChangedEnable
+ data.getGroupName(),
+ clientChannelInfo,
+ data.getConsumeType(),
+ data.getMessageModel(),
+ data.getConsumeFromWhere(),
+ data.getSubscriptionDataSet(),
+ isNotifyConsumerIdsChangedEnable
);
if (changed) {
log.info("registerConsumer info changed {} {}",
- data.toString(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ data.toString(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
- clientChannelInfo);
+ clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -122,18 +121,18 @@ public class ClientManageProcessor implements NettyRequestProcessor {
}
public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response =
- RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+ RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
final UnregisterClientRequestHeader requestHeader =
- (UnregisterClientRequestHeader) request
- .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+ (UnregisterClientRequestHeader)request
+ .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- ctx.channel(),
- requestHeader.getClientID(),
- request.getLanguage(),
- request.getVersion());
+ ctx.channel(),
+ requestHeader.getClientID(),
+ request.getLanguage(),
+ request.getVersion());
{
final String group = requestHeader.getProducerGroup();
if (group != null) {
@@ -145,7 +144,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
final String group = requestHeader.getConsumerGroup();
if (group != null) {
SubscriptionGroupConfig subscriptionGroupConfig =
- this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+ this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
boolean isNotifyConsumerIdsChangedEnable = true;
if (null != subscriptionGroupConfig) {
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index c0c43e0..d2e6d7d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -16,36 +16,39 @@
*/
package org.apache.rocketmq.broker.processor;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.protocol.header.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
-
public class ConsumerManageProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
-
public ConsumerManageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
@@ -64,18 +67,17 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
return false;
}
-
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response =
- RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+ RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
final GetConsumerListByGroupRequestHeader requestHeader =
- (GetConsumerListByGroupRequestHeader) request
- .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+ (GetConsumerListByGroupRequestHeader)request
+ .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(
- requestHeader.getConsumerGroup());
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(
+ requestHeader.getConsumerGroup());
if (consumerGroupInfo != null) {
List<String> clientIds = consumerGroupInfo.getAllClientId();
if (!clientIds.isEmpty()) {
@@ -87,11 +89,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
return response;
} else {
log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -100,34 +102,32 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
}
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response =
- RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+ RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
- (UpdateConsumerOffsetRequestHeader) request
- .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+ (UpdateConsumerOffsetRequestHeader)request
+ .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+ requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
-
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
- throws RemotingCommandException {
+ throws RemotingCommandException {
final RemotingCommand response =
- RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+ RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
final QueryConsumerOffsetResponseHeader responseHeader =
- (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+ (QueryConsumerOffsetResponseHeader)response.readCustomHeader();
final QueryConsumerOffsetRequestHeader requestHeader =
- (QueryConsumerOffsetRequestHeader) request
- .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+ (QueryConsumerOffsetRequestHeader)request
+ .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
long offset =
- this.brokerController.getConsumerOffsetManager().queryOffset(
- requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
-
+ this.brokerController.getConsumerOffsetManager().queryOffset(
+ requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
if (offset >= 0) {
responseHeader.setOffset(offset);
@@ -135,11 +135,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
} else {
long minOffset =
- this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
- requestHeader.getQueueId());
+ this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
+ requestHeader.getQueueId());
if (minOffset <= 0
- && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
- requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+ && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
+ requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);