You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/05/27 06:10:28 UTC
[1/8] incubator-rocketmq git commit: [ROCKETMQ-98]Fix risk of unable
to release putMessage Lock forever closes apache/incubator-rocketmq#61
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop 0adad6f00 -> 8d781757d
[ROCKETMQ-98]Fix risk of unable to release putMessage Lock forever closes apache/incubator-rocketmq#61
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b1fcf1b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b1fcf1b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b1fcf1b8
Branch: refs/heads/develop
Commit: b1fcf1b83b659bd03bcebf651d9e88c294a89e07
Parents: 0adad6f
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:21:09 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:21:09 2017 +0800
----------------------------------------------------------------------
.../apache/rocketmq/common/BrokerConfig.java | 3 ++
.../org/apache/rocketmq/store/CommitLog.java | 40 ++++--------------
.../apache/rocketmq/store/PutMessageLock.java | 25 ++++++++++++
.../rocketmq/store/PutMessageReentrantLock.java | 37 +++++++++++++++++
.../rocketmq/store/PutMessageSpinLock.java | 43 ++++++++++++++++++++
.../store/config/MessageStoreConfig.java | 4 ++
6 files changed, 119 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f0a73bd..5bce013 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -47,6 +47,9 @@ public class BrokerConfig {
private boolean autoCreateSubscriptionGroup = true;
private String messageStorePlugIn = "";
+ /**
+ * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default value is 1.
+ */
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int adminBrokerThreadPoolNums = 16;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7841feb..7b29263 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -23,8 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -63,12 +61,7 @@ public class CommitLog {
private volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
-
- //true: Can lock, false : in lock.
- private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
-
- private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
-
+ private final PutMessageLock putMessageLock;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
@@ -88,6 +81,8 @@ public class CommitLog {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
+ this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
}
public boolean load() {
@@ -577,7 +572,7 @@ public class CommitLog {
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- lockForPutMessage(); //spin...
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
@@ -626,7 +621,7 @@ public class CommitLog {
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
- releasePutMessageLock();
+ putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
@@ -861,7 +856,7 @@ public class CommitLog {
}
public boolean appendData(long startOffset, byte[] data) {
- lockForPutMessage(); //spin...
+ putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
if (null == mappedFile) {
@@ -871,7 +866,7 @@ public class CommitLog {
return mappedFile.appendMessage(data);
} finally {
- releasePutMessageLock();
+ putMessageLock.unlock();
}
}
@@ -906,28 +901,7 @@ public class CommitLog {
return diff;
}
- /**
- * Spin util acquired the lock.
- */
- private void lockForPutMessage() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.lock();
- } else {
- boolean flag;
- do {
- flag = this.putMessageSpinLock.compareAndSet(true, false);
- }
- while (!flag);
- }
- }
- private void releasePutMessageLock() {
- if (this.defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()) {
- putMessageNormalLock.unlock();
- } else {
- this.putMessageSpinLock.compareAndSet(false, true);
- }
- }
public static class GroupCommitRequest {
private final long nextOffset;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
new file mode 100644
index 0000000..a03e41a
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageLock.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * Used when trying to put message
+ */
+public interface PutMessageLock {
+ void lock();
+ void unlock();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
new file mode 100644
index 0000000..9198f1c
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageReentrantLock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Exclusive lock implementation to put message
+ */
+public class PutMessageReentrantLock implements PutMessageLock {
+ private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
+
+ @Override
+ public void lock() {
+ putMessageNormalLock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ putMessageNormalLock.unlock();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
new file mode 100644
index 0000000..baa809d
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageSpinLock.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Spin lock Implementation to put message, suggest using this witb low race conditions
+ *
+ */
+public class PutMessageSpinLock implements PutMessageLock {
+ //true: Can lock, false : in lock.
+ private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);
+
+ @Override
+ public void lock() {
+ boolean flag;
+ do {
+ flag = this.putMessageSpinLock.compareAndSet(true, false);
+ }
+ while (!flag);
+ }
+
+ @Override
+ public void unlock() {
+ this.putMessageSpinLock.compareAndSet(false, true);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b1fcf1b8/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 29f800c..19ed211 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -52,6 +52,10 @@ public class MessageStoreConfig {
@ImportantField
private int commitIntervalCommitLog = 200;
+ /**
+ * introduced since 4.0.x. Determine whether to use mutex reentrantLock when putting message.<br/>
+ * By default it is set to false indicating using spin lock when putting message.
+ */
private boolean useReentrantLockWhenPutMessage = false;
// Whether schedule flush,default is real-time
[3/8] incubator-rocketmq git commit: [ROCKETMQ-200]-Cluster name is
always missing when fetch ClusterInfo from name server closes
apache/incubator-rocketmq#105
Posted by do...@apache.org.
[ROCKETMQ-200]-Cluster name is always missing when fetch ClusterInfo from name server closes apache/incubator-rocketmq#105
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/c7961407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/c7961407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/c7961407
Branch: refs/heads/develop
Commit: c79614071b1941940f934c066bde5711062bdc7e
Parents: 8c8610f
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:24:02 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:24:02 2017 +0800
----------------------------------------------------------------------
.../rocketmq/common/protocol/route/BrokerData.java | 14 +++++++++++---
.../rocketmq/namesrv/routeinfo/RouteInfoManager.java | 10 ++--------
2 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c7961407/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
index 612e5b4..9d868ae 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-/**
- * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
+
package org.apache.rocketmq.common.protocol.route;
import java.util.HashMap;
@@ -29,6 +27,16 @@ public class BrokerData implements Comparable<BrokerData> {
private String brokerName;
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
+ public BrokerData() {
+
+ }
+
+ public BrokerData(String cluster, String brokerName, HashMap<Long, String> brokerAddrs) {
+ this.cluster = cluster;
+ this.brokerName = brokerName;
+ this.brokerAddrs = brokerAddrs;
+ }
+
public String selectBrokerAddr() {
String value = this.brokerAddrs.get(MixAll.MASTER_ID);
if (null == value) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c7961407/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 16b7847..5a953a9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -125,11 +125,7 @@ public class RouteInfoManager {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
- brokerData = new BrokerData();
- brokerData.setBrokerName(brokerName);
- HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
- brokerData.setBrokerAddrs(brokerAddrs);
-
+ brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
@@ -381,9 +377,7 @@ public class RouteInfoManager {
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null != brokerData) {
- BrokerData brokerDataClone = new BrokerData();
- brokerDataClone.setBrokerName(brokerData.getBrokerName());
- brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData
+ BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
.getBrokerAddrs().clone());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
[7/8] incubator-rocketmq git commit: Merge remote-tracking branch
'wip/ROCKETMQ-206' into develop
Posted by do...@apache.org.
Merge remote-tracking branch 'wip/ROCKETMQ-206' into develop
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/e57f9ac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/e57f9ac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/e57f9ac4
Branch: refs/heads/develop
Commit: e57f9ac433bdf4ec640089ccaf580954e93f50dc
Parents: 04c8925 aced0de
Author: dongeforever <zh...@yeah.net>
Authored: Sat May 27 12:39:25 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 12:39:25 2017 +0800
----------------------------------------------------------------------
.../client/consumer/store/LocalFileOffsetStore.java | 14 ++++++++++++--
.../apache/rocketmq/example/benchmark/Consumer.java | 3 ++-
.../org/apache/rocketmq/example/filter/Consumer.java | 3 ++-
.../rocketmq/namesrv/kvconfig/KVConfigManager.java | 7 ++++++-
4 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[5/8] incubator-rocketmq git commit: [ROCKETMQ-67] Consistent Hash
allocate strategy closes apache/incubator-rocketmq#67
Posted by do...@apache.org.
[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/adae1624
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/adae1624
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/adae1624
Branch: refs/heads/develop
Commit: adae1624d05346cd3632c778a656e4055de6bff3
Parents: c796140
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:42:03 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:42:03 2017 +0800
----------------------------------------------------------------------
.../AllocateMessageQueueConsistentHash.java | 124 ++++++++++
.../AllocateMessageQueueConsitentHashTest.java | 243 +++++++++++++++++++
.../consistenthash/ConsistentHashRouter.java | 140 +++++++++++
.../common/consistenthash/HashFunction.java | 24 ++
.../rocketmq/common/consistenthash/Node.java | 28 +++
.../common/consistenthash/VirtualNode.java | 41 ++++
6 files changed, 600 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
new file mode 100644
index 0000000..77198b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.HashFunction;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * Consistent Hashing queue algorithm
+ */
+public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
+ private final Logger log = ClientLogger.getLog();
+
+ private final int virtualNodeCnt;
+ private final HashFunction customHashFunction;
+
+ public AllocateMessageQueueConsistentHash() {
+ this(10);
+ }
+
+ public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
+ this(virtualNodeCnt,null);
+ }
+
+ public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
+ if (virtualNodeCnt < 0) {
+ throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
+ }
+ this.virtualNodeCnt = virtualNodeCnt;
+ this.customHashFunction = customHashFunction;
+ }
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll empty");
+ }
+
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+
+ Collection<ClientNode> cidNodes = new ArrayList<>();
+ for (String cid : cidAll) {
+ cidNodes.add(new ClientNode(cid));
+ }
+
+ final ConsistentHashRouter<ClientNode> router; //for building hash ring
+ if (customHashFunction != null) {
+ router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+ } else {
+ router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+ }
+
+ List<MessageQueue> results = new ArrayList<>();
+ for (MessageQueue mq : mqAll) {
+ ClientNode clientNode = router.routeNode(mq.toString());
+ if (clientNode != null && currentCID.equals(clientNode.getKey())) {
+ results.add(mq);
+ }
+ }
+
+ return results;
+
+ }
+
+ @Override
+ public String getName() {
+ return "CONSISTENT_HASH";
+ }
+
+
+ private static class ClientNode implements Node {
+ private final String clientID;
+
+ public ClientNode(String clientID) {
+ this.clientID = clientID;
+ }
+
+ @Override
+ public String getKey() {
+ return clientID;
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
new file mode 100644
index 0000000..fc7ab9f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMessageQueueConsitentHashTest {
+
+ private String topic;
+ private static final String CID_PREFIX = "CID-";
+
+ @Before
+ public void init() {
+ topic = "topic_test";
+ }
+
+
+
+ public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+ if (messageQueueList == null || messageQueueList.size() < 1)
+ return;
+ System.out.println(name + ".......................................start");
+ for (MessageQueue messageQueue : messageQueueList) {
+ System.out.println(messageQueue);
+ }
+ System.out.println(name + ".......................................end");
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() {
+ String currentCID = String.valueOf(Integer.MAX_VALUE);
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ List<MessageQueue> result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testCurrentCIDNotExists");
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCurrentCIDIllegalArgument() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMessageQueueIllegalArgument() {
+ String currentCID = "0";
+ List<String> consumerIdList = createConsumerIdList(2);
+ new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumerIdIllegalArgument() {
+ String currentCID = "0";
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null);
+ }
+
+ @Test
+ public void testAllocate1() {
+ testAllocate(20,10);
+ }
+
+ @Test
+ public void testAllocate2() {
+ testAllocate(10,20);
+ }
+
+
+ @Test
+ public void testRun100RandomCase(){
+ for(int i=0;i<100;i++){
+ int consumerSize = new Random().nextInt(200)+1;//1-200
+ int queueSize = new Random().nextInt(100)+1;//1-100
+ testAllocate(queueSize,consumerSize);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+
+ public void testAllocate(int queueSize, int consumerSize) {
+ AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
+
+ List<MessageQueue> mqAll = createMessageQueueList(queueSize);
+ //System.out.println("mqAll:" + mqAll.toString());
+
+ List<String> cidAll = createConsumerIdList(consumerSize);
+ List<MessageQueue> allocatedResAll = new ArrayList<>();
+
+ Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+ //test allocate all
+ {
+
+ List<String> cidBegin = new ArrayList<>(cidAll);
+
+ //System.out.println("cidAll:" + cidBegin.toString());
+ for (String cid : cidBegin) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin);
+ for (MessageQueue mq : rs) {
+ allocateToAllOrigin.put(mq, cid);
+ }
+ allocatedResAll.addAll(rs);
+ //System.out.println("rs[" + cid + "]:" + rs.toString());
+ }
+
+ Assert.assertTrue(
+ verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
+ }
+
+ Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
+ List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+ //test allocate remove one cid
+ {
+ String removeCID = cidAfterRemoveOne.remove(0);
+ //System.out.println("removing one cid "+removeCID);
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, String> entry = it.next();
+ if (entry.getValue().equals(removeCID)) {
+ mqShouldOnlyChanged.add(entry.getKey());
+ }
+ }
+
+ //System.out.println("cidAll:" + cidAfterRemoveOne.toString());
+ List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+ for (String cid : cidAfterRemoveOne) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
+ allocatedResAllAfterRemove.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveOne.put(mq, cid);
+ }
+ //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+ }
+
+ Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
+ verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
+ verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
+ }
+
+ List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+ //test allocate add one more cid
+ {
+ String newCid = CID_PREFIX+"NEW";
+ //System.out.println("add one more cid "+newCid);
+ cidAfterAdd.add(newCid);
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ //System.out.println("cidAll:" + cidAfterAdd.toString());
+ List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
+ Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+ for (String cid : cidAfterAdd) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
+ allocatedResAllAfterAdd.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAll3.put(mq, cid);
+ if (cid.equals(newCid)){
+ mqShouldOnlyChanged.add(mq);
+ }
+ }
+ //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+ }
+
+ Assert.assertTrue(
+ verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
+ verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
+ }
+ }
+
+ private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+ if (cidAll.isEmpty()){
+ return allocatedResAll.isEmpty();
+ }
+ return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+ }
+
+ private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
+ for (MessageQueue mq : allocateToBefore.keySet()) {
+ String allocateToOrigin = allocateToBefore.get(mq);
+ if (allocateToOrigin.equals(removeCID)) {
+
+ } else {//the rest queue should be the same
+ Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+ }
+ }
+ }
+
+ private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
+ for (MessageQueue mq : allocateAfter.keySet()) {
+ String allocateToOrigin = allocateBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (allocateToAfter.equals(newCID)) {
+
+ } else {//the rest queue should be the same
+ Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+ }
+ }
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add(CID_PREFIX + String.valueOf(i));
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
new file mode 100644
index 0000000..8606c43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual node.
+ * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+ private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+ private final HashFunction hashFunction;
+
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+ this(pNodes,vNodeCount, new MD5Hash());
+ }
+
+ /**
+ *
+ * @param pNodes collections of physical nodes
+ * @param vNodeCount amounts of virtual nodes
+ * @param hashFunction hash Function to hash Node instances
+ */
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
+ if (hashFunction == null) {
+ throw new NullPointerException("Hash Function is null");
+ }
+ this.hashFunction = hashFunction;
+ if (pNodes != null) {
+ for (T pNode : pNodes) {
+ addNode(pNode, vNodeCount);
+ }
+ }
+ }
+
+ /**
+ * add physic node to the hash ring with some virtual nodes
+ * @param pNode physical node needs added to hash ring
+ * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
+ */
+ public void addNode(T pNode, int vNodeCount) {
+ if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
+ int existingReplicas = getExistingReplicas(pNode);
+ for (int i = 0; i < vNodeCount; i++) {
+ VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+ ring.put(hashFunction.hash(vNode.getKey()), vNode);
+ }
+ }
+
+ /**
+ * remove the physical node from the hash ring
+ * @param pNode
+ */
+ public void removeNode(T pNode) {
+ Iterator<Long> it = ring.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = it.next();
+ VirtualNode<T> virtualNode = ring.get(key);
+ if (virtualNode.isVirtualNodeOf(pNode)) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * with a specified key, route the nearest Node instance in the current hash ring
+ * @param objectKey the object key to find a nearest Node
+ * @return
+ */
+ public T routeNode(String objectKey) {
+ if (ring.isEmpty()) {
+ return null;
+ }
+ Long hashVal = hashFunction.hash(objectKey);
+ SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+ Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
+ return ring.get(nodeHashVal).getPhysicalNode();
+ }
+
+
+ public int getExistingReplicas(T pNode) {
+ int replicas = 0;
+ for (VirtualNode<T> vNode : ring.values()) {
+ if (vNode.isVirtualNodeOf(pNode)) {
+ replicas++;
+ }
+ }
+ return replicas;
+ }
+
+
+ //default hash function
+ private static class MD5Hash implements HashFunction {
+ MessageDigest instance;
+
+ public MD5Hash() {
+ try {
+ instance = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ }
+ }
+
+ @Override
+ public long hash(String key) {
+ instance.reset();
+ instance.update(key.getBytes());
+ byte[] digest = instance.digest();
+
+ long h = 0;
+ for (int i = 0; i < 4; i++) {
+ h <<= 8;
+ h |= ((int) digest[i]) & 0xFF;
+ }
+ return h;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
new file mode 100644
index 0000000..58fd777
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Hash String to long value
+ */
+public interface HashFunction {
+ long hash(String key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
new file mode 100644
index 0000000..0ece210
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Represent a node which should be mapped to a hash ring
+ */
+public interface Node {
+ /**
+ *
+ * @return the key which will be used for hash mapping
+ */
+ String getKey();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/adae1624/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
new file mode 100644
index 0000000..c8b72d9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+
+public class VirtualNode<T extends Node> implements Node {
+ final T physicalNode;
+ final int replicaIndex;
+
+ public VirtualNode(T physicalNode, int replicaIndex) {
+ this.replicaIndex = replicaIndex;
+ this.physicalNode = physicalNode;
+ }
+
+ @Override
+ public String getKey() {
+ return physicalNode.getKey() + "-" + replicaIndex;
+ }
+
+ public boolean isVirtualNodeOf(T pNode) {
+ return physicalNode.getKey().equals(pNode.getKey());
+ }
+
+ public T getPhysicalNode() {
+ return physicalNode;
+ }
+}
[2/8] incubator-rocketmq git commit: [ROCKETMQ-188]RemotingExecption
is not consistent between invoke async and invoke oneway closes
apache/incubator-rocketmq#98
Posted by do...@apache.org.
[ROCKETMQ-188]RemotingExecption is not consistent between invoke async and invoke oneway closes apache/incubator-rocketmq#98
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c8610f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c8610f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c8610f9
Branch: refs/heads/develop
Commit: 8c8610f9121d19bf7108903e41276d5f6afaa81a
Parents: b1fcf1b
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:22:46 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 11:22:46 2017 +0800
----------------------------------------------------------------------
.../remoting/netty/NettyRemotingAbstract.java | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c8610f9/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index cddab3d..15586cb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -417,14 +417,18 @@ public abstract class NettyRemotingAbstract {
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
- String info =
- String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis, //
- this.semaphoreAsync.getQueueLength(), //
- this.semaphoreAsync.availablePermits()//
- );
- PLOG.warn(info);
- throw new RemotingTooMuchRequestException(info);
+ if (timeoutMillis <= 0) {
+ throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
+ } else {
+ String info =
+ String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+ timeoutMillis, //
+ this.semaphoreAsync.getQueueLength(), //
+ this.semaphoreAsync.availablePermits()//
+ );
+ PLOG.warn(info);
+ throw new RemotingTimeoutException(info);
+ }
}
}
[8/8] incubator-rocketmq git commit: Remove diamond operator for
client module with JDK 1.6
Posted by do...@apache.org.
Remove diamond operator for client module with JDK 1.6
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8d781757
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8d781757
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8d781757
Branch: refs/heads/develop
Commit: 8d781757dfb56ebd95b0494e8e2c87422c22d767
Parents: e57f9ac
Author: dongeforever <zh...@yeah.net>
Authored: Sat May 27 14:09:08 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 14:09:08 2017 +0800
----------------------------------------------------------------------
.../AllocateMessageQueueConsistentHash.java | 8 +++---
.../AllocateMessageQueueConsitentHashTest.java | 27 ++++++++++----------
.../consistenthash/ConsistentHashRouter.java | 4 +--
.../org/apache/rocketmq/store/CommitLog.java | 4 +--
4 files changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
index 77198b7..09d940a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -76,19 +76,19 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueue
}
- Collection<ClientNode> cidNodes = new ArrayList<>();
+ Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
- router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+ router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
- router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+ router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
- List<MessageQueue> results = new ArrayList<>();
+ List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
index fc7ab9f..e9e5db7 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Before;
@@ -113,13 +114,13 @@ public class AllocateMessageQueueConsitentHashTest {
//System.out.println("mqAll:" + mqAll.toString());
List<String> cidAll = createConsumerIdList(consumerSize);
- List<MessageQueue> allocatedResAll = new ArrayList<>();
+ List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>();
- Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+ Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>();
//test allocate all
{
- List<String> cidBegin = new ArrayList<>(cidAll);
+ List<String> cidBegin = new ArrayList<String>(cidAll);
//System.out.println("cidAll:" + cidBegin.toString());
for (String cid : cidBegin) {
@@ -135,13 +136,13 @@ public class AllocateMessageQueueConsitentHashTest {
verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
}
- Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
- List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+ Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<MessageQueue, String>();
+ List<String> cidAfterRemoveOne = new ArrayList<String>(cidAll);
//test allocate remove one cid
{
String removeCID = cidAfterRemoveOne.remove(0);
//System.out.println("removing one cid "+removeCID);
- List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<MessageQueue, String> entry = it.next();
@@ -151,7 +152,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
//System.out.println("cidAll:" + cidAfterRemoveOne.toString());
- List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+ List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<MessageQueue>();
for (String cid : cidAfterRemoveOne) {
List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
allocatedResAllAfterRemove.addAll(rs);
@@ -166,16 +167,16 @@ public class AllocateMessageQueueConsitentHashTest {
verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
}
- List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+ List<String> cidAfterAdd = new ArrayList<String>(cidAfterRemoveOne);
//test allocate add one more cid
{
String newCid = CID_PREFIX+"NEW";
//System.out.println("add one more cid "+newCid);
cidAfterAdd.add(newCid);
- List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
//System.out.println("cidAll:" + cidAfterAdd.toString());
- List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
- Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+ List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<MessageQueue>();
+ Map<MessageQueue, String> allocateToAll3 = new TreeMap<MessageQueue, String>();
for (String cid : cidAfterAdd) {
List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
allocatedResAllAfterAdd.addAll(rs);
@@ -225,7 +226,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
private List<String> createConsumerIdList(int size) {
- List<String> consumerIdList = new ArrayList<>(size);
+ List<String> consumerIdList = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
consumerIdList.add(CID_PREFIX + String.valueOf(i));
}
@@ -233,7 +234,7 @@ public class AllocateMessageQueueConsitentHashTest {
}
private List<MessageQueue> createMessageQueueList(int size) {
- List<MessageQueue> messageQueueList = new ArrayList<>(size);
+ List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
for (int i = 0; i < size; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
messageQueueList.add(mq);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
index 8606c43..a6fce51 100644
--- a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -30,7 +30,7 @@ import java.util.TreeMap;
* @param <T>
*/
public class ConsistentHashRouter<T extends Node> {
- private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+ private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<Long, VirtualNode<T>>();
private final HashFunction hashFunction;
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
@@ -64,7 +64,7 @@ public class ConsistentHashRouter<T extends Node> {
if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
int existingReplicas = getExistingReplicas(pNode);
for (int i = 0; i < vNodeCount; i++) {
- VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+ VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
ring.put(hashFunction.hash(vNode.getKey()), vNode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8d781757/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 7b29263..b44211c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -722,7 +722,7 @@ public class CommitLog {
messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch));
- lockForPutMessage(); //spin...
+ putMessageLock.lock();
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
@@ -771,7 +771,7 @@ public class CommitLog {
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
- releasePutMessageLock();
+ putMessageLock.unlock();
}
[6/8] incubator-rocketmq git commit: [ROCKETMQ-160]SendHeartBeat may
not be logged in the expected period closes apache/incubator-rocketmq#86
Posted by do...@apache.org.
[ROCKETMQ-160]SendHeartBeat may not be logged in the expected period closes apache/incubator-rocketmq#86
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/04c8925d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/04c8925d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/04c8925d
Branch: refs/heads/develop
Commit: 04c8925d6da77a41cef746a9c6478a407c4c9edd
Parents: adae162
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 12:38:00 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Sat May 27 12:38:00 2017 +0800
----------------------------------------------------------------------
.../client/impl/factory/MQClientInstance.java | 66 ++++++++++----------
1 file changed, 34 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/04c8925d/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index a8c65b2..1b075ee 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -112,7 +112,7 @@ public class MQClientInstance {
private final RebalanceService rebalanceService;
private final DefaultMQProducer defaultMQProducer;
private final ConsumerStatsManager consumerStatsManager;
- private final AtomicLong storeTimesTotal = new AtomicLong(0);
+ private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
private ServiceState serviceState = ServiceState.CREATE_JUST;
private DatagramSocket datagramSocket;
private Random random = new Random();
@@ -517,38 +517,40 @@ public class MQClientInstance {
return;
}
- long times = this.storeTimesTotal.getAndIncrement();
- Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, HashMap<Long, String>> entry = it.next();
- String brokerName = entry.getKey();
- HashMap<Long, String> oneTable = entry.getValue();
- if (oneTable != null) {
- for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
- Long id = entry1.getKey();
- String addr = entry1.getValue();
- if (addr != null) {
- if (consumerEmpty) {
- if (id != MixAll.MASTER_ID)
- continue;
- }
-
- try {
- int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
- if (!this.brokerVersionTable.containsKey(brokerName)) {
- this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
- }
- this.brokerVersionTable.get(brokerName).put(addr, version);
- if (times % 20 == 0) {
- log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
- log.info(heartbeatData.toString());
+ if (!this.brokerAddrTable.isEmpty()) {
+ long times = this.sendHeartbeatTimesTotal.getAndIncrement();
+ Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, HashMap<Long, String>> entry = it.next();
+ String brokerName = entry.getKey();
+ HashMap<Long, String> oneTable = entry.getValue();
+ if (oneTable != null) {
+ for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
+ Long id = entry1.getKey();
+ String addr = entry1.getValue();
+ if (addr != null) {
+ if (consumerEmpty) {
+ if (id != MixAll.MASTER_ID)
+ continue;
}
- } catch (Exception e) {
- if (this.isBrokerInNameServer(addr)) {
- log.error("send heart beat to broker exception", e);
- } else {
- log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
- id, addr);
+
+ try {
+ int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
+ if (!this.brokerVersionTable.containsKey(brokerName)) {
+ this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
+ }
+ this.brokerVersionTable.get(brokerName).put(addr, version);
+ if (times % 20 == 0) {
+ log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
+ log.info(heartbeatData.toString());
+ }
+ } catch (Exception e) {
+ if (this.isBrokerInNameServer(addr)) {
+ log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
+ } else {
+ log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
+ id, addr);
+ }
}
}
}
[4/8] incubator-rocketmq git commit: [ROCKETMQ-206] Catch the
IOException when call the file2String method.
Posted by do...@apache.org.
[ROCKETMQ-206] Catch the IOException when call the file2String method.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/aced0de7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/aced0de7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/aced0de7
Branch: refs/heads/develop
Commit: aced0de7d8f98a01d9d109dd592a6cb31fd174d9
Parents: ceeef8e
Author: yukon <yu...@apache.org>
Authored: Sat May 27 11:34:44 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Sat May 27 11:34:44 2017 +0800
----------------------------------------------------------------------
.../client/consumer/store/LocalFileOffsetStore.java | 14 ++++++++++++--
.../apache/rocketmq/example/benchmark/Consumer.java | 3 ++-
.../org/apache/rocketmq/example/filter/Consumer.java | 3 ++-
.../rocketmq/namesrv/kvconfig/KVConfigManager.java | 7 ++++++-
4 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 2cde5f8..6c81516 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -180,7 +180,12 @@ public class LocalFileOffsetStore implements OffsetStore {
}
private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
- String content = MixAll.file2String(this.storePath);
+ String content = null;
+ try {
+ content = MixAll.file2String(this.storePath);
+ } catch (IOException e) {
+ log.warn("Load local offset store file exception", e);
+ }
if (null == content || content.length() == 0) {
return this.readLocalOffsetBak();
} else {
@@ -198,7 +203,12 @@ public class LocalFileOffsetStore implements OffsetStore {
}
private OffsetSerializeWrapper readLocalOffsetBak() throws MQClientException {
- String content = MixAll.file2String(this.storePath + ".bak");
+ String content = null;
+ try {
+ content = MixAll.file2String(this.storePath + ".bak");
+ } catch (IOException e) {
+ log.warn("Load local offset store bak file exception", e);
+ }
if (content != null && content.length() > 0) {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index 3e1b79b..d431d3e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.benchmark;
+import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
@@ -39,7 +40,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
public class Consumer {
- public static void main(String[] args) throws MQClientException {
+ public static void main(String[] args) throws MQClientException, IOException {
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("benchmarkConsumer", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
index d63435b..0be8e1d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.example.filter;
import java.io.File;
+import java.io.IOException;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
@@ -28,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
+ public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/aced0de7/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
index 69afcad..be13bd6 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -43,7 +43,12 @@ public class KVConfigManager {
}
public void load() {
- String content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+ String content = null;
+ try {
+ content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());
+ } catch (IOException e) {
+ log.warn("Load KV config table exception", e);
+ }
if (content != null) {
KVConfigSerializeWrapper kvConfigSerializeWrapper =
KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);