You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:53 UTC
[57/58] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Remove bad
practices in broker.
ROCKETMQ-18 Remove bad practices in broker.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/9165667a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/9165667a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/9165667a
Branch: refs/heads/ROCKETMQ-18
Commit: 9165667aa3975900ed1f79fd6c612cfd2e05a01f
Parents: f56e038
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 21:49:37 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 21:49:37 2016 +0800
----------------------------------------------------------------------
.../rocketmq/broker/client/net/Broker2Client.java | 4 ++--
.../rocketmq/broker/filtersrv/FilterServerUtil.java | 3 +--
.../rocketmq/broker/latency/BrokerFastFailure.java | 2 +-
.../broker/offset/ConsumerOffsetManager.java | 12 ++++--------
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 11 +++++------
.../rocketmq/broker/plugin/MessageStoreFactory.java | 3 +--
.../processor/AbstractSendMessageProcessor.java | 10 ++++++----
.../broker/processor/AdminBrokerProcessor.java | 10 ++++------
.../broker/processor/PullMessageProcessor.java | 2 +-
.../broker/processor/QueryMessageProcessor.java | 4 ++--
.../broker/processor/SendMessageProcessor.java | 1 +
.../broker/transaction/TransactionStore.java | 16 ++++++++--------
12 files changed, 36 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 152f373..70027cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -202,7 +202,7 @@ public class Broker2Client {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
- new Object[]{topic, group, entry.getValue().getClientId()});
+ topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}",
new Object[]{topic, group}, e);
@@ -290,7 +290,7 @@ public class Broker2Client {
consumerStatusTable.put(clientId, body.getMessageQueueTable());
log.info(
"[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
- new Object[]{topic, group, clientId});
+ topic, group, clientId);
}
}
default:
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
index 1c40c0e..de4cc37 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -37,7 +37,6 @@ public class FilterServerUtil {
}
private static String[] splitShellString(final String shellString) {
- String[] split = shellString.split(" ");
- return split;
+ return shellString.split(" ");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 4810d77..2f4b568 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -77,7 +77,7 @@ public class BrokerFastFailure {
break;
}
final RequestTask rt = castRunnable(runnable);
- if (rt.isStopRun()) {
+ if (rt == null || rt.isStopRun()) {
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ef9065e..7188e8d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
+ if (arrays.length == 2) {
String topic = arrays[0];
String group = arrays[1];
@@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<Integer, Long> next = it.next();
long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
long offsetInPersist = next.getValue();
- if (offsetInPersist > minOffsetInStore) {
- result = false;
- } else {
- result = true;
- }
+ result = offsetInPersist <= minOffsetInStore;
}
return result;
@@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
+ if (arrays.length == 2) {
if (group.equals(arrays[1])) {
topics.add(arrays[0]);
}
@@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager {
Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
String topicAtGroup = next.getKey();
String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
- if (arrays != null && arrays.length == 2) {
+ if (arrays.length == 2) {
if (topic.equals(arrays[0])) {
groups.add(arrays[1]);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index ab02ceb..335c105 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -88,13 +88,11 @@ public class BrokerOuterAPI {
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
- if (addrArray != null) {
- for (String addr : addrArray) {
- lst.add(addr);
- }
-
- this.remotingClient.updateNameServerAddressList(lst);
+ for (String addr : addrArray) {
+ lst.add(addr);
}
+
+ this.remotingClient.updateNameServerAddressList(lst);
}
public RegisterBrokerResult registerBrokerAll(
@@ -159,6 +157,7 @@ public class BrokerOuterAPI {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
+ // Ignore
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
index d27b6aa..42793ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -37,8 +37,7 @@ public final class MessageStoreFactory {
@SuppressWarnings("unchecked")
Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
- AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
- messageStore = pluginMessageStore;
+ messageStore = construct.newInstance(context, messageStore);
} catch (Throwable e) {
throw new RuntimeException(String.format(
"Initialize plugin's class %s not found!", pluginClass), e);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 81a239b..8a285e8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
return response;
}
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
- String errorMsg =
- "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
+ String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
@@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
hook.sendMessageBefore(context);
- requestHeader.setProperties(context.getMsgProps());
+ if (requestHeader != null) {
+ requestHeader.setProperties(context.getMsgProps());
+ }
} catch (Throwable e) {
+ // Ignore
}
}
}
@@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
hook.sendMessageAfter(context);
} catch (Throwable e) {
-
+ // Ignore
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 94aa414..4588d2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final ResetOffsetRequestHeader requestHeader =
(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
- new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
- requestHeader.getTimestamp(), requestHeader.isForce()});
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+ requestHeader.getTimestamp(), requestHeader.isForce());
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
@@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
- new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()});
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getClientAddr());
@@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
newRequest.setExtFields(request.getExtFields());
newRequest.setBody(request.getBody());
- RemotingCommand consumerResponse =
- this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
- return consumerResponse;
+ return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
} catch (RemotingTimeoutException e) {
response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
response
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 7f88593..7625d21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
}
}
} catch (RemotingCommandException e1) {
- LOG.error("excuteRequestWhenWakeup run", e1);
+ LOG.error("executeRequestWhenWakeup run", e1);
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index b41e0a5..5c60255 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
queryMessageResult.release();
if (!future.isSuccess()) {
- log.error("transfer query message by pagecache failed, ", future.cause());
+ log.error("transfer query message by page cache failed, ", future.cause());
}
}
});
@@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
public void operationComplete(ChannelFuture future) throws Exception {
selectMappedBufferResult.release();
if (!future.isSuccess()) {
- log.error("transfer one message by pagecache failed, ", future.cause());
+ log.error("transfer one message by page cache failed, ", future.cause());
}
}
});
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index defe7e3..5cebd0e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
try {
hook.consumeMessageAfter(context);
} catch (Throwable e) {
+ // Ignore
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
index 758eeed..d6e897a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -21,26 +21,26 @@ import java.util.List;
public interface TransactionStore {
- public boolean open();
+ boolean open();
- public void close();
+ void close();
- public boolean put(final List<TransactionRecord> trs);
+ boolean put(final List<TransactionRecord> trs);
- public void remove(final List<Long> pks);
+ void remove(final List<Long> pks);
- public List<TransactionRecord> traverse(final long pk, final int nums);
+ List<TransactionRecord> traverse(final long pk, final int nums);
- public long totalRecords();
+ long totalRecords();
- public long minPK();
+ long minPK();
- public long maxPK();
+ long maxPK();
}