You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/05/27 06:10:28 UTC

[1/8] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop 0adad6f00 -> 8d781757d


[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b1fcf1b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b1fcf1b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b1fcf1b8

Branch: refs/heads/develop
Commit: b1fcf1b83b659bd03bcebf651d9e88c294a89e07
Parents: 0adad6f
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:21:09 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:21:09 2017 +0800

----------------------------------------------------------------------
 .../apache/rocketmq/common/BrokerConfig.java    |  3 ++
 .../org/apache/rocketmq/store/CommitLog.java    | 40 ++++--------------
 .../apache/rocketmq/store/PutMessageLock.java   | 25 ++++++++++++
 .../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++
 .../rocketmq/store/PutMessageSpinLock.java      | 43 ++++++++++++++++++++
 .../store/config/MessageStoreConfig.java        |  4 ++
 6 files changed, 119 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f0a73bd..5bce013 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -47,6 +47,9 @@ public class BrokerConfig {
     private boolean autoCreateSubscriptionGroup = true;
     private String messageStorePlugIn = "";
 
+    /**
+     * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
+     */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int adminBrokerThreadPoolNums = 16;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7841feb..7b29263 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,12 +61,7 @@ public class CommitLog {
     private volatile long confirmOffset = -1L;
 
     private volatile long beginTimeInLock = 0;
-
-    //true: Can lock, false : in lock.
-    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
-    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
+    private final PutMessageLock putMessageLock;
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
         this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
             defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -88,6 +81,8 @@ public class CommitLog {
                 return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
             }
         };
+        this.putMessageLock =  defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
     }
 
     public boolean load() {
@@ -577,7 +572,7 @@ public class CommitLog {
         MappedFile unlockMappedFile = null;
         MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
-        lockForPutMessage(); //spin...
+        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
         try {
             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
@@ -626,7 +621,7 @@ public class CommitLog {
             eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
             beginTimeInLock = 0;
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
 
         if (eclipseTimeInLock > 500) {
@@ -861,7 +856,7 @@ public class CommitLog {
     }
 
     public boolean appendData(long startOffset, byte[] data) {
-        lockForPutMessage(); //spin...
+        putMessageLock.lock();
         try {
             MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
             if (null == mappedFile) {
@@ -871,7 +866,7 @@ public class CommitLog {
 
             return mappedFile.appendMessage(data);
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
     }
 
@@ -906,28 +901,7 @@ public class CommitLog {
         return diff;
     }
 
-    /**
-     * Spin util acquired the lock.
-     */
-    private void lockForPutMessage() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
-            putMessageNormalLock.lock();
-        } else {
-            boolean flag;
-            do {
-                flag = this.putMessageSpinLock.compareAndSet(true, false);
-            }
-            while (!flag);
-        }
-    }
 
-    private void releasePutMessageLock() {
-        if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
-            putMessageNormalLock.unlock();
-        } else {
-            this.putMessageSpinLock.compareAndSet(false, true);
-        }
-    }
 
     public static class GroupCommitRequest {
         private final long nextOffset;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
new file mode 100644
index 0000000..a03e41a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * Used when trying to put message
+ */
+public interface PutMessageLock {
+    void lock();
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
new file mode 100644
index 0000000..9198f1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Exclusive lock implementation to put message
+ */
+public class PutMessageReentrantLock implements PutMessageLock {
+    private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+    @Override
+    public void lock() {
+        putMessageNormalLock.lock();
+    }
+
+    @Override
+    public void unlock() {
+        putMessageNormalLock.unlock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
new file mode 100644
index 0000000..baa809d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Spin lock Implementation to put message, suggest using this witb low race conditions
+ *
+ */
+public class PutMessageSpinLock implements PutMessageLock {
+    //true: Can lock, false : in lock.
+    private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+    @Override
+    public void lock() {
+        boolean flag;
+        do {
+            flag = this.putMessageSpinLock.compareAndSet(true, false);
+        }
+        while (!flag);
+    }
+
+    @Override
+    public void unlock() {
+        this.putMessageSpinLock.compareAndSet(false, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 29f800c..19ed211 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -52,6 +52,10 @@ public class MessageStoreConfig {
     @ImportantField
     private int commitIntervalCommitLog = 200;
 
+    /**
+     * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
+     * By default it is set to false indicating using spin lock when putting message.
+     */
     private boolean useReentrantLockWhenPutMessage = false;
 
     // Whether schedule flush,default is real-time


[3/8] incubator-rocketmq git commit: [ROCKETMQ-200]-Cluster name is always missing when fetch ClusterInfo from name server closes apache/incubator-rocketmq#105

Posted by do...@apache.org.
[ROCKETMQ-200]-Cluster name is always missing when fetch ClusterInfo from name server closes apache/incubator-rocketmq#105


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/c7961407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/c7961407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/c7961407

Branch: refs/heads/develop
Commit: c79614071b1941940f934c066bde5711062bdc7e
Parents: 8c8610f
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:24:02 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:24:02 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/common/protocol/route/BrokerData.java    | 14 +++++++++++---
 .../rocketmq/namesrv/routeinfo/RouteInfoManager.java  | 10 ++--------
 2 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c7961407/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 612e5b4..9d868ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-/**
- * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
+
 package org.apache.rocketmq.common.protocol.route;
 
 import java.util.HashMap;
@@ -29,6 +27,16 @@ public class BrokerData implements Comparable<BrokerData> {
     private String brokerName;
     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
 
+    public BrokerData() {
+
+    }
+
+    public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs) {
+        this.cluster = cluster;
+        this.brokerName = brokerName;
+        this.brokerAddrs = brokerAddrs;
+    }
+
     public String selectBrokerAddr() {
         String value = this.brokerAddrs.get(MixAll.MASTER_ID);
         if (null == value) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c7961407/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 16b7847..5a953a9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -125,11 +125,7 @@ public class RouteInfoManager {
                 BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                 if (null == brokerData) {
                     registerFirst = true;
-                    brokerData = new BrokerData();
-                    brokerData.setBrokerName(brokerName);
-                    HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
-                    brokerData.setBrokerAddrs(brokerAddrs);
-
+                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                     this.brokerAddrTable.put(brokerName, brokerData);
                 }
                 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
@@ -381,9 +377,7 @@ public class RouteInfoManager {
                     for (String brokerName : brokerNameSet) {
                         BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                         if (null != brokerData) {
-                            BrokerData brokerDataClone = new BrokerData();
-                            brokerDataClone.setBrokerName(brokerData.getBrokerName());
-                            brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
+                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                 .getBrokerAddrs().clone());
                             brokerDataList.add(brokerDataClone);
                             foundBrokerData = true;


[7/8] incubator-rocketmq git commit: Merge remote-tracking branch 'wip/ROCKETMQ-206' into develop

Posted by do...@apache.org.
Merge remote-tracking branch 'wip/ROCKETMQ-206' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e57f9ac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e57f9ac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e57f9ac4

Branch: refs/heads/develop
Commit: e57f9ac433bdf4ec640089ccaf580954e93f50dc
Parents: 04c8925 aced0de
Author: dongeforever <zh...@yeah.net>
Authored: Sat May 27 12:39:25 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 12:39:25 2017 +0800

----------------------------------------------------------------------
 .../client/consumer/store/LocalFileOffsetStore.java   | 14 ++++++++++++--
 .../apache/rocketmq/example/benchmark/Consumer.java   |  3 ++-
 .../org/apache/rocketmq/example/filter/Consumer.java  |  3 ++-
 .../rocketmq/namesrv/kvconfig/KVConfigManager.java    |  7 ++++++-
 4 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[5/8] incubator-rocketmq git commit: [ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67

Posted by do...@apache.org.
[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/adae1624
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/adae1624
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/adae1624

Branch: refs/heads/develop
Commit: adae1624d05346cd3632c778a656e4055de6bff3
Parents: c796140
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:42:03 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:42:03 2017 +0800

----------------------------------------------------------------------
 .../AllocateMessageQueueConsistentHash.java     | 124 ++++++++++
 .../AllocateMessageQueueConsitentHashTest.java  | 243 +++++++++++++++++++
 .../consistenthash/ConsistentHashRouter.java    | 140 +++++++++++
 .../common/consistenthash/HashFunction.java     |  24 ++
 .../rocketmq/common/consistenthash/Node.java    |  28 +++
 .../common/consistenthash/VirtualNode.java      |  41 ++++
 6 files changed, 600 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
new file mode 100644
index 0000000..77198b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.HashFunction;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * Consistent Hashing queue algorithm
+ */
+public class AllocateMessageQueueConsistentHash  implements AllocateMessageQueueStrategy {
+    private final Logger log = ClientLogger.getLog();
+
+    private final int virtualNodeCnt;
+    private final HashFunction customHashFunction;
+
+    public AllocateMessageQueueConsistentHash() {
+        this(10);
+    }
+
+    public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
+        this(virtualNodeCnt,null);
+    }
+
+    public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
+        if (virtualNodeCnt < 0) {
+            throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
+        }
+        this.virtualNodeCnt = virtualNodeCnt;
+        this.customHashFunction = customHashFunction;
+    }
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+        List<String> cidAll) {
+
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+                consumerGroup,
+                currentCID,
+                cidAll);
+            return result;
+        }
+
+
+        Collection<ClientNode> cidNodes = new ArrayList<>();
+        for (String cid : cidAll) {
+            cidNodes.add(new ClientNode(cid));
+        }
+
+        final ConsistentHashRouter<ClientNode> router; //for building hash ring
+        if (customHashFunction != null) {
+            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+        } else {
+            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+        }
+
+        List<MessageQueue> results = new ArrayList<>();
+        for (MessageQueue mq : mqAll) {
+            ClientNode clientNode = router.routeNode(mq.toString());
+            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
+                results.add(mq);
+            }
+        }
+
+        return results;
+
+    }
+
+    @Override
+    public String getName() {
+        return "CONSISTENT_HASH";
+    }
+
+
+    private static class ClientNode implements Node {
+        private final String clientID;
+
+        public ClientNode(String clientID) {
+            this.clientID = clientID;
+        }
+
+        @Override
+        public String getKey() {
+            return clientID;
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
new file mode 100644
index 0000000..fc7ab9f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMessageQueueConsitentHashTest {
+
+    private String topic;
+    private static final String CID_PREFIX = "CID-";
+
+    @Before
+    public void init() {
+        topic = "topic_test";
+    }
+
+
+
+    public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+        if (messageQueueList == null || messageQueueList.size() < 1)
+            return;
+        System.out.println(name + ".......................................start");
+        for (MessageQueue messageQueue : messageQueueList) {
+            System.out.println(messageQueue);
+        }
+        System.out.println(name + ".......................................end");
+    }
+
+    @Test
+    public void testCurrentCIDNotExists() {
+        String currentCID = String.valueOf(Integer.MAX_VALUE);
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        List<MessageQueue> result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testCurrentCIDNotExists");
+        Assert.assertEquals(result.size(), 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCurrentCIDIllegalArgument() {
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMessageQueueIllegalArgument() {
+        String currentCID = "0";
+        List<String> consumerIdList = createConsumerIdList(2);
+        new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumerIdIllegalArgument() {
+        String currentCID = "0";
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null);
+    }
+
+    @Test
+    public void testAllocate1() {
+        testAllocate(20,10);
+    }
+
+    @Test
+    public void testAllocate2() {
+        testAllocate(10,20);
+    }
+
+
+    @Test
+    public void testRun100RandomCase(){
+        for(int i=0;i<100;i++){
+            int consumerSize = new Random().nextInt(200)+1;//1-200
+            int queueSize = new Random().nextInt(100)+1;//1-100
+            testAllocate(queueSize,consumerSize);
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {}
+        }
+    }
+
+
+    public void testAllocate(int queueSize, int consumerSize) {
+        AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
+
+        List<MessageQueue> mqAll = createMessageQueueList(queueSize);
+        //System.out.println("mqAll:" + mqAll.toString());
+
+        List<String> cidAll = createConsumerIdList(consumerSize);
+        List<MessageQueue> allocatedResAll = new ArrayList<>();
+
+        Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+        //test allocate all
+        {
+
+            List<String> cidBegin = new ArrayList<>(cidAll);
+
+            //System.out.println("cidAll:" + cidBegin.toString());
+            for (String cid : cidBegin) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin);
+                for (MessageQueue mq : rs) {
+                    allocateToAllOrigin.put(mq, cid);
+                }
+                allocatedResAll.addAll(rs);
+                //System.out.println("rs[" + cid + "]:" + rs.toString());
+            }
+
+            Assert.assertTrue(
+                verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
+        }
+
+        Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
+        List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+        //test allocate remove one cid
+        {
+            String removeCID = cidAfterRemoveOne.remove(0);
+            //System.out.println("removing one cid "+removeCID);
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, String> entry = it.next();
+                if (entry.getValue().equals(removeCID)) {
+                    mqShouldOnlyChanged.add(entry.getKey());
+                }
+            }
+
+            //System.out.println("cidAll:" + cidAfterRemoveOne.toString());
+            List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+            for (String cid : cidAfterRemoveOne) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
+                allocatedResAllAfterRemove.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAllAfterRemoveOne.put(mq, cid);
+                }
+                //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+            }
+
+            Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
+                verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
+            verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
+        }
+
+        List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+        //test allocate add one more cid
+        {
+            String newCid = CID_PREFIX+"NEW";
+            //System.out.println("add one more cid "+newCid);
+            cidAfterAdd.add(newCid);
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            //System.out.println("cidAll:" + cidAfterAdd.toString());
+            List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
+            Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+            for (String cid : cidAfterAdd) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
+                allocatedResAllAfterAdd.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAll3.put(mq, cid);
+                    if (cid.equals(newCid)){
+                        mqShouldOnlyChanged.add(mq);
+                    }
+                }
+                //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+            }
+
+            Assert.assertTrue(
+                verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
+            verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
+        }
+    }
+
+    private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+        if (cidAll.isEmpty()){
+            return allocatedResAll.isEmpty();
+        }
+        return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+    }
+
+    private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
+        for (MessageQueue mq : allocateToBefore.keySet()) {
+            String allocateToOrigin = allocateToBefore.get(mq);
+            if (allocateToOrigin.equals(removeCID)) {
+
+            } else {//the rest queue should be the same
+                Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+            }
+        }
+    }
+
+    private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
+        for (MessageQueue mq : allocateAfter.keySet()) {
+            String allocateToOrigin = allocateBefore.get(mq);
+            String allocateToAfter = allocateAfter.get(mq);
+            if (allocateToAfter.equals(newCID)) {
+
+            } else {//the rest queue should be the same
+                Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+            }
+        }
+    }
+
+    private List<String> createConsumerIdList(int size) {
+        List<String> consumerIdList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(CID_PREFIX + String.valueOf(i));
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(int size) {
+        List<MessageQueue> messageQueueList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
new file mode 100644
index 0000000..8606c43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual node.
+ * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+    private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+    private final HashFunction hashFunction;
+
+    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+        this(pNodes,vNodeCount, new MD5Hash());
+    }
+
+    /**
+     *
+     * @param pNodes collections of physical nodes
+     * @param vNodeCount amounts of virtual nodes
+     * @param hashFunction hash Function to hash Node instances
+     */
+    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
+        if (hashFunction == null) {
+            throw new NullPointerException("Hash Function is null");
+        }
+        this.hashFunction = hashFunction;
+        if (pNodes != null) {
+            for (T pNode : pNodes) {
+                addNode(pNode, vNodeCount);
+            }
+        }
+    }
+
+    /**
+     * add physic node to the hash ring with some virtual nodes
+     * @param pNode physical node needs added to hash ring
+     * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
+     */
+    public void addNode(T pNode, int vNodeCount) {
+        if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
+        int existingReplicas = getExistingReplicas(pNode);
+        for (int i = 0; i < vNodeCount; i++) {
+            VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+            ring.put(hashFunction.hash(vNode.getKey()), vNode);
+        }
+    }
+
+    /**
+     * remove the physical node from the hash ring
+     * @param pNode
+     */
+    public void removeNode(T pNode) {
+        Iterator<Long> it = ring.keySet().iterator();
+        while (it.hasNext()) {
+            Long key = it.next();
+            VirtualNode<T> virtualNode = ring.get(key);
+            if (virtualNode.isVirtualNodeOf(pNode)) {
+                it.remove();
+            }
+        }
+    }
+
+    /**
+     * with a specified key, route the nearest Node instance in the current hash ring
+     * @param objectKey the object key to find a nearest Node
+     * @return
+     */
+    public T routeNode(String objectKey) {
+        if (ring.isEmpty()) {
+            return null;
+        }
+        Long hashVal = hashFunction.hash(objectKey);
+        SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+        Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
+        return ring.get(nodeHashVal).getPhysicalNode();
+    }
+
+
+    public int getExistingReplicas(T pNode) {
+        int replicas = 0;
+        for (VirtualNode<T> vNode : ring.values()) {
+            if (vNode.isVirtualNodeOf(pNode)) {
+                replicas++;
+            }
+        }
+        return replicas;
+    }
+
+    
+    //default hash function
+    private static class MD5Hash implements HashFunction {
+        MessageDigest instance;
+
+        public MD5Hash() {
+            try {
+                instance = MessageDigest.getInstance("MD5");
+            } catch (NoSuchAlgorithmException e) {
+            }
+        }
+
+        @Override
+        public long hash(String key) {
+            instance.reset();
+            instance.update(key.getBytes());
+            byte[] digest = instance.digest();
+
+            long h = 0;
+            for (int i = 0; i < 4; i++) {
+                h <<= 8;
+                h |= ((int) digest[i]) & 0xFF;
+            }
+            return h;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
new file mode 100644
index 0000000..58fd777
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Hash String to long value
+ */
+public interface HashFunction {
+    long hash(String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
new file mode 100644
index 0000000..0ece210
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Represent a node which should be mapped to a hash ring
+ */
+public interface Node {
+    /**
+     *
+     * @return the key which will be used for hash mapping
+     */
+    String getKey();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
new file mode 100644
index 0000000..c8b72d9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+
+public class VirtualNode<T extends Node> implements Node {
+    final T physicalNode;
+    final int replicaIndex;
+
+    public VirtualNode(T physicalNode, int replicaIndex) {
+        this.replicaIndex = replicaIndex;
+        this.physicalNode = physicalNode;
+    }
+
+    @Override
+    public String getKey() {
+        return physicalNode.getKey() + "-" + replicaIndex;
+    }
+
+    public boolean isVirtualNodeOf(T pNode) {
+        return physicalNode.getKey().equals(pNode.getKey());
+    }
+
+    public T getPhysicalNode() {
+        return physicalNode;
+    }
+}


[2/8] incubator-rocketmq git commit: [ROCKETMQ-188]RemotingExecption is not consistent between invoke async and invoke oneway closes apache/incubator-rocketmq#98

Posted by do...@apache.org.
[ROCKETMQ-188]RemotingExecption is not consistent between invoke async and invoke oneway closes apache/incubator-rocketmq#98


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c8610f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c8610f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c8610f9

Branch: refs/heads/develop
Commit: 8c8610f9121d19bf7108903e41276d5f6afaa81a
Parents: b1fcf1b
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:22:46 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:22:46 2017 +0800

----------------------------------------------------------------------
 .../remoting/netty/NettyRemotingAbstract.java   | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c8610f9/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index cddab3d..15586cb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -417,14 +417,18 @@ public abstract class NettyRemotingAbstract {
                 throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
             }
         } else {
-            String info =
-                String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
-                    timeoutMillis, //
-                    this.semaphoreAsync.getQueueLength(), //
-                    this.semaphoreAsync.availablePermits()//
-                );
-            PLOG.warn(info);
-            throw new RemotingTooMuchRequestException(info);
+            if (timeoutMillis <= 0) {
+                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
+            } else {
+                String info =
+                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+                        timeoutMillis, //
+                        this.semaphoreAsync.getQueueLength(), //
+                        this.semaphoreAsync.availablePermits()//
+                    );
+                PLOG.warn(info);
+                throw new RemotingTimeoutException(info);
+            }
         }
     }
 


[8/8] incubator-rocketmq git commit: Remove diamond operator for client module with JDK 1.6

Posted by do...@apache.org.
Remove diamond operator for client module with JDK 1.6


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8d781757
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8d781757
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8d781757

Branch: refs/heads/develop
Commit: 8d781757dfb56ebd95b0494e8e2c87422c22d767
Parents: e57f9ac
Author: dongeforever <zh...@yeah.net>
Authored: Sat May 27 14:09:08 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 14:09:08 2017 +0800

----------------------------------------------------------------------
 .../AllocateMessageQueueConsistentHash.java     |  8 +++---
 .../AllocateMessageQueueConsitentHashTest.java  | 27 ++++++++++----------
 .../consistenthash/ConsistentHashRouter.java    |  4 +--
 .../org/apache/rocketmq/store/CommitLog.java    |  4 +--
 4 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
index 77198b7..09d940a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash  implements AllocateMessageQueue
         }
 
 
-        Collection<ClientNode> cidNodes = new ArrayList<>();
+        Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
         for (String cid : cidAll) {
             cidNodes.add(new ClientNode(cid));
         }
 
         final ConsistentHashRouter<ClientNode> router; //for building hash ring
         if (customHashFunction != null) {
-            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
         } else {
-            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+            router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
         }
 
-        List<MessageQueue> results = new ArrayList<>();
+        List<MessageQueue> results = new ArrayList<MessageQueue>();
         for (MessageQueue mq : mqAll) {
             ClientNode clientNode = router.routeNode(mq.toString());
             if (clientNode != null && currentCID.equals(clientNode.getKey())) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
index fc7ab9f..e9e5db7 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.junit.Assert;
 import org.junit.Before;
@@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest {
         //System.out.println("mqAll:" + mqAll.toString());
 
         List<String> cidAll = createConsumerIdList(consumerSize);
-        List<MessageQueue> allocatedResAll = new ArrayList<>();
+        List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>();
 
-        Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+        Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>();
         //test allocate all
         {
 
-            List<String> cidBegin = new ArrayList<>(cidAll);
+            List<String> cidBegin = new ArrayList<String>(cidAll);
 
             //System.out.println("cidAll:" + cidBegin.toString());
             for (String cid : cidBegin) {
@@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest {
                 verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
         }
 
-        Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
-        List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+        Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<MessageQueue, String>();
+        List<String> cidAfterRemoveOne = new ArrayList<String>(cidAll);
         //test allocate remove one cid
         {
             String removeCID = cidAfterRemoveOne.remove(0);
             //System.out.println("removing one cid "+removeCID);
-            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
             Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
             while (it.hasNext()) {
                 Map.Entry<MessageQueue, String> entry = it.next();
@@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest {
             }
 
             //System.out.println("cidAll:" + cidAfterRemoveOne.toString());
-            List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+            List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<MessageQueue>();
             for (String cid : cidAfterRemoveOne) {
                 List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
                 allocatedResAllAfterRemove.addAll(rs);
@@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest {
             verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
         }
 
-        List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+        List<String> cidAfterAdd = new ArrayList<String>(cidAfterRemoveOne);
         //test allocate add one more cid
         {
             String newCid = CID_PREFIX+"NEW";
             //System.out.println("add one more cid "+newCid);
             cidAfterAdd.add(newCid);
-            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
             //System.out.println("cidAll:" + cidAfterAdd.toString());
-            List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
-            Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+            List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<MessageQueue>();
+            Map<MessageQueue, String> allocateToAll3 = new TreeMap<MessageQueue, String>();
             for (String cid : cidAfterAdd) {
                 List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
                 allocatedResAllAfterAdd.addAll(rs);
@@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest {
     }
 
     private List<String> createConsumerIdList(int size) {
-        List<String> consumerIdList = new ArrayList<>(size);
+        List<String> consumerIdList = new ArrayList<String>(size);
         for (int i = 0; i < size; i++) {
             consumerIdList.add(CID_PREFIX + String.valueOf(i));
         }
@@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest {
     }
 
     private List<MessageQueue> createMessageQueueList(int size) {
-        List<MessageQueue> messageQueueList = new ArrayList<>(size);
+        List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
         for (int i = 0; i < size; i++) {
             MessageQueue mq = new MessageQueue(topic, "brokerName", i);
             messageQueueList.add(mq);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
index 8606c43..a6fce51 100644
--- a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -30,7 +30,7 @@ import java.util.TreeMap;
  * @param <T>
  */
 public class ConsistentHashRouter<T extends Node> {
-    private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+    private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();
     private final HashFunction hashFunction;
 
     public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
@@ -64,7 +64,7 @@ public class ConsistentHashRouter<T extends Node> {
         if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
         int existingReplicas = getExistingReplicas(pNode);
         for (int i = 0; i < vNodeCount; i++) {
-            VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+            VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
             ring.put(hashFunction.hash(vNode.getKey()), vNode);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7b29263..b44211c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -722,7 +722,7 @@ public class CommitLog {
 
         messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
 
-        lockForPutMessage(); //spin...
+        putMessageLock.lock();
         try {
             long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
             this.beginTimeInLock = beginLockTimestamp;
@@ -771,7 +771,7 @@ public class CommitLog {
             eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
             beginTimeInLock = 0;
         } finally {
-            releasePutMessageLock();
+            putMessageLock.unlock();
         }
 
 


[6/8] incubator-rocketmq git commit: [ROCKETMQ-160]SendHeartBeat may not be logged in the expected period closes apache/incubator-rocketmq#86

Posted by do...@apache.org.
[ROCKETMQ-160]SendHeartBeat may not be logged in the expected period closes apache/incubator-rocketmq#86


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/04c8925d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/04c8925d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/04c8925d

Branch: refs/heads/develop
Commit: 04c8925d6da77a41cef746a9c6478a407c4c9edd
Parents: adae162
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 12:38:00 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 12:38:00 2017 +0800

----------------------------------------------------------------------
 .../client/impl/factory/MQClientInstance.java   | 66 ++++++++++----------
 1 file changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/04c8925d/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index a8c65b2..1b075ee 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -112,7 +112,7 @@ public class MQClientInstance {
     private final RebalanceService rebalanceService;
     private final DefaultMQProducer defaultMQProducer;
     private final ConsumerStatsManager consumerStatsManager;
-    private final AtomicLong storeTimesTotal = new AtomicLong(0);
+    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
     private ServiceState serviceState = ServiceState.CREATE_JUST;
     private DatagramSocket datagramSocket;
     private Random random = new Random();
@@ -517,38 +517,40 @@ public class MQClientInstance {
             return;
         }
 
-        long times = this.storeTimesTotal.getAndIncrement();
-        Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, HashMap<Long, String>> entry = it.next();
-            String brokerName = entry.getKey();
-            HashMap<Long, String> oneTable = entry.getValue();
-            if (oneTable != null) {
-                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
-                    Long id = entry1.getKey();
-                    String addr = entry1.getValue();
-                    if (addr != null) {
-                        if (consumerEmpty) {
-                            if (id != MixAll.MASTER_ID)
-                                continue;
-                        }
-
-                        try {
-                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
-                            if (!this.brokerVersionTable.containsKey(brokerName)) {
-                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
-                            }
-                            this.brokerVersionTable.get(brokerName).put(addr, version);
-                            if (times % 20 == 0) {
-                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
-                                log.info(heartbeatData.toString());
+        if (!this.brokerAddrTable.isEmpty()) {
+            long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+            Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
+            while (it.hasNext()) {
+                Entry<String, HashMap<Long, String>> entry = it.next();
+                String brokerName = entry.getKey();
+                HashMap<Long, String> oneTable = entry.getValue();
+                if (oneTable != null) {
+                    for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
+                        Long id = entry1.getKey();
+                        String addr = entry1.getValue();
+                        if (addr != null) {
+                            if (consumerEmpty) {
+                                if (id != MixAll.MASTER_ID)
+                                    continue;
                             }
-                        } catch (Exception e) {
-                            if (this.isBrokerInNameServer(addr)) {
-                                log.error("send heart beat to broker exception", e);
-                            } else {
-                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
-                                    id, addr);
+
+                            try {
+                                int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+                                if (!this.brokerVersionTable.containsKey(brokerName)) {
+                                    this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
+                                }
+                                this.brokerVersionTable.get(brokerName).put(addr, version);
+                                if (times % 20 == 0) {
+                                    log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
+                                    log.info(heartbeatData.toString());
+                                }
+                            } catch (Exception e) {
+                                if (this.isBrokerInNameServer(addr)) {
+                                    log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
+                                } else {
+                                    log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
+                                        id, addr);
+                                }
                             }
                         }
                     }


[4/8] incubator-rocketmq git commit: [ROCKETMQ-206] Catch the IOException when call the file2String method.

Posted by do...@apache.org.
[ROCKETMQ-206] Catch the IOException when call the file2String method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/aced0de7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/aced0de7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/aced0de7

Branch: refs/heads/develop
Commit: aced0de7d8f98a01d9d109dd592a6cb31fd174d9
Parents: ceeef8e
Author: yukon <yu...@apache.org>
Authored: Sat May 27 11:34:44 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sat May 27 11:34:44 2017 +0800

----------------------------------------------------------------------
 .../client/consumer/store/LocalFileOffsetStore.java   | 14 ++++++++++++--
 .../apache/rocketmq/example/benchmark/Consumer.java   |  3 ++-
 .../org/apache/rocketmq/example/filter/Consumer.java  |  3 ++-
 .../rocketmq/namesrv/kvconfig/KVConfigManager.java    |  7 ++++++-
 4 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 2cde5f8..6c81516 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -180,7 +180,12 @@ public class LocalFileOffsetStore implements OffsetStore {
     }
 
     private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
-        String content = MixAll.file2String(this.storePath);
+        String content = null;
+        try {
+            content = MixAll.file2String(this.storePath);
+        } catch (IOException e) {
+            log.warn("Load local offset store file exception", e);
+        }
         if (null == content || content.length() == 0) {
             return this.readLocalOffsetBak();
         } else {
@@ -198,7 +203,12 @@ public class LocalFileOffsetStore implements OffsetStore {
     }
 
     private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException {
-        String content = MixAll.file2String(this.storePath + ".bak");
+        String content = null;
+        try {
+            content = MixAll.file2String(this.storePath + ".bak");
+        } catch (IOException e) {
+            log.warn("Load local offset store bak file exception", e);
+        }
         if (content != null && content.length() > 0) {
             OffsetSerializeWrapper offsetSerializeWrapper = null;
             try {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 3e1b79b..d431d3e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.example.benchmark;
 
+import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Timer;
@@ -39,7 +40,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
 
 public class Consumer {
 
-    public static void main(String[] args) throws MQClientException {
+    public static void main(String[] args) throws MQClientException, IOException {
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
         if (null == commandLine) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
index d63435b..0be8e1d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.example.filter;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -28,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 
 public class Consumer {
 
-    public static void main(String[] args) throws InterruptedException, MQClientException {
+    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
 
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
index 69afcad..be13bd6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -43,7 +43,12 @@ public class KVConfigManager {
     }
 
     public void load() {
-        String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+        String content = null;
+        try {
+            content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+        } catch (IOException e) {
+            log.warn("Load KV config table exception", e);
+        }
         if (content != null) {
             KVConfigSerializeWrapper kvConfigSerializeWrapper =
                 KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);