You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:13 UTC

[06/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Remove bad practices in broker.

ROCKETMQ-18 Remove bad practices in broker.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/9165667a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/9165667a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/9165667a

Branch: refs/heads/spec
Commit: 9165667aa3975900ed1f79fd6c612cfd2e05a01f
Parents: f56e038
Author: yukon <yu...@apache.org>
Authored: Tue Dec 27 21:49:37 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Dec 27 21:49:37 2016 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/client/net/Broker2Client.java   |  4 ++--
 .../rocketmq/broker/filtersrv/FilterServerUtil.java |  3 +--
 .../rocketmq/broker/latency/BrokerFastFailure.java  |  2 +-
 .../broker/offset/ConsumerOffsetManager.java        | 12 ++++--------
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java  | 11 +++++------
 .../rocketmq/broker/plugin/MessageStoreFactory.java |  3 +--
 .../processor/AbstractSendMessageProcessor.java     | 10 ++++++----
 .../broker/processor/AdminBrokerProcessor.java      | 10 ++++------
 .../broker/processor/PullMessageProcessor.java      |  2 +-
 .../broker/processor/QueryMessageProcessor.java     |  4 ++--
 .../broker/processor/SendMessageProcessor.java      |  1 +
 .../broker/transaction/TransactionStore.java        | 16 ++++++++--------
 12 files changed, 36 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 152f373..70027cd 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -202,7 +202,7 @@ public class Broker2Client {
                     try {
                         this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                         log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
-                                new Object[]{topic, group, entry.getValue().getClientId()});
+                                topic, group, entry.getValue().getClientId());
                     } catch (Exception e) {
                         log.error("[reset-offset] reset offset exception. topic={}, group={}",
                                 new Object[]{topic, group}, e);
@@ -290,7 +290,7 @@ public class Broker2Client {
                                 consumerStatusTable.put(clientId, body.getMessageQueueTable());
                                 log.info(
                                         "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
-                                        new Object[]{topic, group, clientId});
+                                        topic, group, clientId);
                             }
                         }
                         default:

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
index 1c40c0e..de4cc37 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -37,7 +37,6 @@ public class FilterServerUtil {
     }
 
     private static String[] splitShellString(final String shellString) {
-        String[] split = shellString.split(" ");
-        return split;
+        return shellString.split(" ");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 4810d77..2f4b568 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -77,7 +77,7 @@ public class BrokerFastFailure {
                         break;
                     }
                     final RequestTask rt = castRunnable(runnable);
-                    if (rt.isStopRun()) {
+                    if (rt == null || rt.isStopRun()) {
                         break;
                     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ef9065e..7188e8d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -58,7 +58,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
+            if (arrays.length == 2) {
                 String topic = arrays[0];
                 String group = arrays[1];
 
@@ -80,11 +80,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             Entry<Integer, Long> next = it.next();
             long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
             long offsetInPersist = next.getValue();
-            if (offsetInPersist > minOffsetInStore) {
-                result = false;
-            } else {
-                result = true;
-            }
+            result = offsetInPersist <= minOffsetInStore;
         }
 
         return result;
@@ -99,7 +95,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
+            if (arrays.length == 2) {
                 if (group.equals(arrays[1])) {
                     topics.add(arrays[0]);
                 }
@@ -118,7 +114,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
+            if (arrays.length == 2) {
                 if (topic.equals(arrays[0])) {
                     groups.add(arrays[1]);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index ab02ceb..335c105 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -88,13 +88,11 @@ public class BrokerOuterAPI {
     public void updateNameServerAddressList(final String addrs) {
         List<String> lst = new ArrayList<String>();
         String[] addrArray = addrs.split(";");
-        if (addrArray != null) {
-            for (String addr : addrArray) {
-                lst.add(addr);
-            }
-
-            this.remotingClient.updateNameServerAddressList(lst);
+        for (String addr : addrArray) {
+            lst.add(addr);
         }
+
+        this.remotingClient.updateNameServerAddressList(lst);
     }
 
     public RegisterBrokerResult registerBrokerAll(
@@ -159,6 +157,7 @@ public class BrokerOuterAPI {
             try {
                 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
             } catch (RemotingTooMuchRequestException e) {
+                // Ignore
             }
             return null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
index d27b6aa..42793ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -37,8 +37,7 @@ public final class MessageStoreFactory {
                     @SuppressWarnings("unchecked")
                     Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
                     Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
-                    AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
-                    messageStore = pluginMessageStore;
+                    messageStore = construct.newInstance(context, messageStore);
                 } catch (Throwable e) {
                     throw new RuntimeException(String.format(
                             "Initialize plugin's class %s not found!", pluginClass), e);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 81a239b..8a285e8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -176,8 +176,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
             return response;
         }
         if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
-            String errorMsg =
-                    "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
+            String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
             log.warn(errorMsg);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(errorMsg);
@@ -273,8 +272,11 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
                     }
 
                     hook.sendMessageBefore(context);
-                    requestHeader.setProperties(context.getMsgProps());
+                    if (requestHeader != null) {
+                        requestHeader.setProperties(context.getMsgProps());
+                    }
                 } catch (Throwable e) {
+                    // Ignore
                 }
             }
         }
@@ -319,7 +321,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
                     }
                     hook.sendMessageAfter(context);
                 } catch (Throwable e) {
-
+                    // Ignore
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 94aa414..4588d2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -722,8 +722,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final ResetOffsetRequestHeader requestHeader =
                 (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
         log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
-                new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
-                        requestHeader.getTimestamp(), requestHeader.isForce()});
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+                requestHeader.getTimestamp(), requestHeader.isForce());
         boolean isC = false;
         LanguageCode language = request.getLanguage();
         switch (language) {
@@ -740,7 +740,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
 
         log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
-                new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()});
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
 
         return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(),
                 requestHeader.getClientAddr());
@@ -1193,9 +1193,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             newRequest.setExtFields(request.getExtFields());
             newRequest.setBody(request.getBody());
 
-            RemotingCommand consumerResponse =
-                    this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
-            return consumerResponse;
+            return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
         } catch (RemotingTimeoutException e) {
             response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
             response

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 7f88593..7625d21 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -528,7 +528,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                         }
                     }
                 } catch (RemotingCommandException e1) {
-                    LOG.error("excuteRequestWhenWakeup run", e1);
+                    LOG.error("executeRequestWhenWakeup run", e1);
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index b41e0a5..5c60255 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -116,7 +116,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
                     public void operationComplete(ChannelFuture future) throws Exception {
                         queryMessageResult.release();
                         if (!future.isSuccess()) {
-                            log.error("transfer query message by pagecache failed, ", future.cause());
+                            log.error("transfer query message by page cache failed, ", future.cause());
                         }
                     }
                 });
@@ -158,7 +158,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
                     public void operationComplete(ChannelFuture future) throws Exception {
                         selectMappedBufferResult.release();
                         if (!future.isSuccess()) {
-                            log.error("transfer one message by pagecache failed, ", future.cause());
+                            log.error("transfer one message by page cache failed, ", future.cause());
                         }
                     }
                 });

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index defe7e3..5cebd0e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -467,6 +467,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 try {
                     hook.consumeMessageAfter(context);
                 } catch (Throwable e) {
+                    // Ignore
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/9165667a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
index 758eeed..d6e897a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -21,26 +21,26 @@ import java.util.List;
 
 
 public interface TransactionStore {
-    public boolean open();
+    boolean open();
 
 
-    public void close();
+    void close();
 
 
-    public boolean put(final List<TransactionRecord> trs);
+    boolean put(final List<TransactionRecord> trs);
 
 
-    public void remove(final List<Long> pks);
+    void remove(final List<Long> pks);
 
 
-    public List<TransactionRecord> traverse(final long pk, final int nums);
+    List<TransactionRecord> traverse(final long pk, final int nums);
 
 
-    public long totalRecords();
+    long totalRecords();
 
 
-    public long minPK();
+    long minPK();
 
 
-    public long maxPK();
+    long maxPK();
 }