You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/22 02:30:40 UTC

[01/10] incubator-rocketmq git commit: [maven-release-plugin] prepare for next development iteration

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master 0c5e53db6 -> 7883094d9


[maven-release-plugin] prepare for next development iteration


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/7883094d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/7883094d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/7883094d

Branch: refs/heads/master
Commit: 7883094d921cab33063446ce57e0a9af276de2e2
Parents: ccb682d
Author: dongeforever <do...@apache.org>
Authored: Thu Jun 8 15:56:33 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 broker/pom.xml        | 2 +-
 client/pom.xml        | 2 +-
 common/pom.xml        | 2 +-
 distribution/pom.xml  | 2 +-
 example/pom.xml       | 4 ++--
 filter/pom.xml        | 2 +-
 filtersrv/pom.xml     | 2 +-
 logappender/pom.xml   | 2 +-
 namesrv/pom.xml       | 2 +-
 openmessaging/pom.xml | 2 +-
 pom.xml               | 4 ++--
 remoting/pom.xml      | 2 +-
 srvutil/pom.xml       | 2 +-
 store/pom.xml         | 2 +-
 test/pom.xml          | 2 +-
 tools/pom.xml         | 2 +-
 16 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 0f4ae31..fd1e691 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index d97ba77..7b73f3f 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index cfad0d3..56b5d44 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 20e9e6e..9839b15 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
     <artifactId>rocketmq-distribution</artifactId>
     <name>rocketmq-distribution ${project.version}</name>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 260085e..947e5f2 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-openmessaging</artifactId>
-            <version>4.1.0-incubating</version>
+            <version>4.2.0-incubating-SNAPSHOT</version>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/filter/pom.xml
----------------------------------------------------------------------
diff --git a/filter/pom.xml b/filter/pom.xml
index b08ce2d..f0a18e3 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/filtersrv/pom.xml
----------------------------------------------------------------------
diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml
index f41f3b7..2fc3e07 100644
--- a/filtersrv/pom.xml
+++ b/filtersrv/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/logappender/pom.xml
----------------------------------------------------------------------
diff --git a/logappender/pom.xml b/logappender/pom.xml
index c18ace7..b75f25e 100644
--- a/logappender/pom.xml
+++ b/logappender/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>rocketmq-logappender</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index 9939e4f..253135f 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/openmessaging/pom.xml
----------------------------------------------------------------------
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index e857be9..ec59b87 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ea4af1e..90f40d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
     <inceptionYear>2012</inceptionYear>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-all</artifactId>
-    <version>4.1.0-incubating</version>
+    <version>4.2.0-incubating-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>Apache RocketMQ ${project.version}</name>
     <url>http://rocketmq.incubator.apache.org/</url>
@@ -43,7 +43,7 @@
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git
         </developerConnection>
-        <tag>rocketmq-all-4.1.0-incubating</tag>
+        <tag>HEAD</tag>
     </scm>
 
     <mailingLists>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/remoting/pom.xml
----------------------------------------------------------------------
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 569b917..df207cf 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 53fee10..57b4085 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/store/pom.xml
----------------------------------------------------------------------
diff --git a/store/pom.xml b/store/pom.xml
index 45223fd..e0256b1 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index 30dd2a8..49f3761 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7883094d/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index e3da25d..439cb68 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating</version>
+        <version>4.2.0-incubating-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>


[05/10] incubator-rocketmq git commit: [ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when running client closes apache/incubator-rocketmq#10

Posted by do...@apache.org.
[ROCKETMQ-208]incompatibility problem found in enviroment of JDK 1.7 when running client closes apache/incubator-rocketmq#10


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/24d6eefb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/24d6eefb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/24d6eefb

Branch: refs/heads/master
Commit: 24d6eefba65845b91513188f66d230af021a3cc9
Parents: 609fc94
Author: Jaskey <li...@gmail.com>
Authored: Tue Jun 6 16:06:46 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 .../broker/client/ConsumerGroupInfo.java        |  9 ++--
 .../rocketmq/broker/client/ConsumerManager.java |  7 +--
 .../broker/client/net/Broker2Client.java        |  6 +--
 .../client/rebalance/RebalanceLockManager.java  |  3 +-
 .../broker/filter/ConsumerFilterManager.java    |  9 ++--
 .../broker/filtersrv/FilterServerManager.java   |  3 +-
 .../longpolling/PullRequestHoldService.java     |  3 +-
 .../broker/offset/ConsumerOffsetManager.java    | 31 +++++++------
 .../broker/processor/AdminBrokerProcessor.java  |  4 +-
 .../subscription/SubscriptionGroupManager.java  |  5 +-
 .../broker/topic/TopicConfigManager.java        |  5 +-
 .../consumer/MQPullConsumerScheduleService.java |  7 +--
 .../consumer/store/LocalFileOffsetStore.java    |  3 +-
 .../consumer/store/OffsetSerializeWrapper.java  |  7 +--
 .../consumer/store/RemoteBrokerOffsetStore.java |  3 +-
 .../rocketmq/client/impl/MQClientManager.java   |  3 +-
 .../consumer/DefaultMQPullConsumerImpl.java     |  4 +-
 .../consumer/DefaultMQPushConsumerImpl.java     |  6 +--
 .../client/impl/consumer/MessageQueueLock.java  |  3 +-
 .../client/impl/consumer/PullAPIWrapper.java    |  5 +-
 .../client/impl/consumer/RebalanceImpl.java     | 13 +++---
 .../client/impl/factory/MQClientInstance.java   | 17 +++----
 .../impl/producer/DefaultMQProducerImpl.java    |  5 +-
 .../protocol/body/ConsumerConnection.java       |  5 +-
 .../body/ConsumerOffsetSerializeWrapper.java    |  9 ++--
 .../protocol/body/SubscriptionGroupWrapper.java |  7 +--
 .../body/TopicConfigSerializeWrapper.java       |  7 +--
 .../common/stats/MomentStatsItemSet.java        |  5 +-
 .../rocketmq/common/stats/StatsItemSet.java     |  3 +-
 .../filtersrv/filter/FilterClassManager.java    |  3 +-
 .../namesrv/routeinfo/RouteInfoManager.java     |  4 +-
 .../remoting/netty/NettyRemotingAbstract.java   |  3 +-
 .../remoting/netty/NettyRemotingClient.java     |  3 +-
 .../store/AllocateMappedFileService.java        |  3 +-
 .../rocketmq/store/DefaultMessageStore.java     | 49 ++++++++++----------
 .../schedule/DelayOffsetSerializeWrapper.java   |  7 +--
 .../store/schedule/ScheduleMessageService.java  |  5 +-
 37 files changed, 153 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index 6ce542a..91b6c81 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -34,9 +35,9 @@ import org.slf4j.LoggerFactory;
 public class ConsumerGroupInfo {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final String groupName;
-    private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
         new ConcurrentHashMap<String, SubscriptionData>();
-    private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+    private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
         new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
     private volatile ConsumeType consumeType;
     private volatile MessageModel messageModel;
@@ -63,11 +64,11 @@ public class ConsumerGroupInfo {
         return null;
     }
 
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+    public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
         return subscriptionTable;
     }
 
-    public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+    public ConcurrentMap<Channel, ClientChannelInfo> getChannelInfoTable() {
         return channelInfoTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index a5ddec8..4a262e5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
 public class ConsumerManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
-    private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
+    private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
         new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
     private final ConsumerIdsChangeListener consumerIdsChangeListener;
 
@@ -145,7 +146,7 @@ public class ConsumerManager {
             Entry<String, ConsumerGroupInfo> next = it.next();
             String group = next.getKey();
             ConsumerGroupInfo consumerGroupInfo = next.getValue();
-            ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                 consumerGroupInfo.getChannelInfoTable();
 
             Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
@@ -176,7 +177,7 @@ public class ConsumerManager {
         Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<String, ConsumerGroupInfo> entry = it.next();
-            ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
+            ConcurrentMap<String, SubscriptionData> subscriptionTable =
                 entry.getValue().getSubscriptionTable();
             if (subscriptionTable.containsKey(topic)) {
                 groups.add(entry.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 863da62..65b444e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -189,7 +189,7 @@ public class Broker2Client {
             this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
 
         if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
-            ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+            ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
                 consumerGroupInfo.getChannelInfoTable();
             for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
                 int version = entry.getValue().getVersion();
@@ -252,7 +252,7 @@ public class Broker2Client {
 
         Map<String, Map<MessageQueue, Long>> consumerStatusTable =
             new HashMap<String, Map<MessageQueue, Long>>();
-        ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =
             this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
         if (null == channelInfoTable || channelInfoTable.isEmpty()) {
             result.setCode(ResponseCode.SYSTEM_ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index 98aceb6..ed5a875 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.client.rebalance;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -31,7 +32,7 @@ public class RebalanceLockManager {
     private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
         "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
     private final Lock lock = new ReentrantLock();
-    private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
+    private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
         new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
 
     public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
index 7f790af..f50db86 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ConsumerFilterManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.broker.filter;
 
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -45,7 +46,7 @@ public class ConsumerFilterManager extends ConfigManager {
 
     private static final long MS_24_HOUR = 24 * 3600 * 1000;
 
-    private ConcurrentHashMap<String/*Topic*/, FilterDataMapByTopic>
+    private ConcurrentMap<String/*Topic*/, FilterDataMapByTopic>
         filterDataByTopic = new ConcurrentHashMap<String/*consumer group*/, FilterDataMapByTopic>(256);
 
     private transient BrokerController brokerController;
@@ -316,7 +317,7 @@ public class ConsumerFilterManager extends ConfigManager {
         }
     }
 
-    public ConcurrentHashMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
+    public ConcurrentMap<String, FilterDataMapByTopic> getFilterDataByTopic() {
         return filterDataByTopic;
     }
 
@@ -326,7 +327,7 @@ public class ConsumerFilterManager extends ConfigManager {
 
     public static class FilterDataMapByTopic {
 
-        private ConcurrentHashMap<String/*consumer group*/, ConsumerFilterData>
+        private ConcurrentMap<String/*consumer group*/, ConsumerFilterData>
             groupFilterData = new ConcurrentHashMap<String, ConsumerFilterData>();
 
         private String topic;
@@ -452,7 +453,7 @@ public class ConsumerFilterManager extends ConfigManager {
             return this.groupFilterData.get(consumerGroup);
         }
 
-        public final ConcurrentHashMap<String, ConsumerFilterData> getGroupFilterData() {
+        public final ConcurrentMap<String, ConsumerFilterData> getGroupFilterData() {
             return this.groupFilterData;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index b935bc8..52cb919 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -38,7 +39,7 @@ public class FilterServerManager {
 
     public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
+    private final ConcurrentMap<Channel, FilterServerInfo> filterServerTable =
         new ConcurrentHashMap<Channel, FilterServerInfo>(16);
     private final BrokerController brokerController;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 71f56a4..b1bd86f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -20,6 +20,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
@@ -33,7 +34,7 @@ public class PullRequestHoldService extends ServiceThread {
     private static final String TOPIC_QUEUEID_SEPARATOR = "@";
     private final BrokerController brokerController;
     private final SystemClock systemClock = new SystemClock();
-    private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
+    private ConcurrentMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
         new ConcurrentHashMap<String, ManyPullRequest>(1024);
 
     public PullRequestHoldService(final BrokerController brokerController) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 769c4ad..57565a6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -36,8 +37,8 @@ public class ConsumerOffsetManager extends ConfigManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
-    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
-        new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
 
     private transient BrokerController brokerController;
 
@@ -49,9 +50,9 @@ public class ConsumerOffsetManager extends ConfigManager {
     }
 
     public void scanUnsubscribedTopic() {
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
             if (arrays.length == 2) {
@@ -67,7 +68,7 @@ public class ConsumerOffsetManager extends ConfigManager {
         }
     }
 
-    private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
+    private boolean offsetBehindMuchThanData(final String topic, ConcurrentMap<Integer, Long> table) {
         Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
         boolean result = !table.isEmpty();
 
@@ -84,9 +85,9 @@ public class ConsumerOffsetManager extends ConfigManager {
     public Set<String> whichTopicByConsumer(final String group) {
         Set<String> topics = new HashSet<String>();
 
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
             if (arrays.length == 2) {
@@ -102,9 +103,9 @@ public class ConsumerOffsetManager extends ConfigManager {
     public Set<String> whichGroupByTopic(final String topic) {
         Set<String> groups = new HashSet<String>();
 
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
             String topicAtGroup = next.getKey();
             String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
             if (arrays.length == 2) {
@@ -124,7 +125,7 @@ public class ConsumerOffsetManager extends ConfigManager {
     }
 
     private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
-        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
         if (null == map) {
             map = new ConcurrentHashMap<Integer, Long>(32);
             map.put(queueId, offset);
@@ -140,7 +141,7 @@ public class ConsumerOffsetManager extends ConfigManager {
     public long queryOffset(final String group, final String topic, final int queueId) {
         // topic@group
         String key = topic + TOPIC_GROUP_SEPARATOR + group;
-        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
+        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
         if (null != map) {
             Long offset = map.get(queueId);
             if (offset != null)
@@ -173,11 +174,11 @@ public class ConsumerOffsetManager extends ConfigManager {
         return RemotingSerializable.toJson(this, prettyFormat);
     }
 
-    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+    public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
         this.offsetTable = offsetTable;
     }
 
@@ -196,7 +197,7 @@ public class ConsumerOffsetManager extends ConfigManager {
             }
         }
 
-        for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
+        for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
             String topicGroup = offSetEntry.getKey();
             String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
             if (topic.equals(topicGroupArr[0])) {
@@ -224,7 +225,7 @@ public class ConsumerOffsetManager extends ConfigManager {
     }
 
     public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
-        ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
+        ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
         if (offsets != null) {
             this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index f59d295..71fdda9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -29,7 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -1084,7 +1084,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         GetConsumeStatsInBrokerHeader requestHeader =
             (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
         boolean isOrder = requestHeader.isOrder();
-        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups =
+        ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups =
             brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
 
         List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index bdf2a01..bd4a26e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.subscription;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class SubscriptionGroupManager extends ConfigManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
-    private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+    private final ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
         new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
     private final DataVersion dataVersion = new DataVersion();
     private transient BrokerController brokerController;
@@ -169,7 +170,7 @@ public class SubscriptionGroupManager extends ConfigManager {
         }
     }
 
-    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+    public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
         return subscriptionGroupTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 93a631a..3bcafc0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -44,7 +45,7 @@ public class TopicConfigManager extends ConfigManager {
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private transient final Lock lockTopicConfigTable = new ReentrantLock();
 
-    private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+    private final ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>(1024);
     private final DataVersion dataVersion = new DataVersion();
     private final Set<String> systemTopicList = new HashSet<String>();
@@ -416,7 +417,7 @@ public class TopicConfigManager extends ConfigManager {
         return dataVersion;
     }
 
-    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+    public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
         return topicConfigTable;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index 6bae85a..e0b546d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -35,11 +36,11 @@ import org.slf4j.Logger;
 public class MQPullConsumerScheduleService {
     private final Logger log = ClientLogger.getLog();
     private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
-    private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
+    private final ConcurrentMap<MessageQueue, PullTaskImpl> taskTable =
         new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
     private DefaultMQPullConsumer defaultMQPullConsumer;
     private int pullThreadNums = 20;
-    private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
+    private ConcurrentMap<String /* topic */, PullTaskCallback> callbackTable =
         new ConcurrentHashMap<String, PullTaskCallback>();
     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
@@ -100,7 +101,7 @@ public class MQPullConsumerScheduleService {
         }
     }
 
-    public ConcurrentHashMap<String, PullTaskCallback> getCallbackTable() {
+    public ConcurrentMap<String, PullTaskCallback> getCallbackTable() {
         return callbackTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 6c81516..d4b19b2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -45,7 +46,7 @@ public class LocalFileOffsetStore implements OffsetStore {
     private final MQClientInstance mQClientFactory;
     private final String groupName;
     private final String storePath;
-    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
         new ConcurrentHashMap<MessageQueue, AtomicLong>();
 
     public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
index 32bcc9f..7dfd97a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/OffsetSerializeWrapper.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.consumer.store;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
@@ -25,14 +26,14 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
  * Wrapper class for offset serialization
  */
 public class OffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
         new ConcurrentHashMap<MessageQueue, AtomicLong>();
 
-    public ConcurrentHashMap<MessageQueue, AtomicLong> getOffsetTable() {
+    public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable) {
+    public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {
         this.offsetTable = offsetTable;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 60ad101..5bd5749 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -21,6 +21,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -42,7 +43,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
     private final static Logger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
     private final String groupName;
-    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
+    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
         new ConcurrentHashMap<MessageQueue, AtomicLong>();
 
     public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index f596b83..25877d7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.client.impl;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -28,7 +29,7 @@ public class MQClientManager {
     private final static Logger log = ClientLogger.getLog();
     private static MQClientManager instance = new MQClientManager();
     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
-    private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
+    private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
         new ConcurrentHashMap<String, MQClientInstance>();
 
     private MQClientManager() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 7d43b37..35ee16f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -115,7 +115,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             throw new IllegalArgumentException("topic is null");
         }
 
-        ConcurrentHashMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
+        ConcurrentMap<MessageQueue, ProcessQueue> mqTable = this.rebalanceImpl.getProcessQueueTable();
         Set<MessageQueue> mqResult = new HashSet<MessageQueue>();
         for (MessageQueue mq : mqTable.keySet()) {
             if (mq.getTopic().equals(topic)) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 8767964..9bf34be 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -26,7 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -805,7 +805,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
     }
 
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
         return this.rebalanceImpl.getSubscriptionInner();
     }
 
@@ -1060,7 +1060,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
 
     private long computeAccumulationTotal() {
         long msgAccTotal = 0;
-        ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
+        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = this.rebalanceImpl.getProcessQueueTable();
         Iterator<Entry<MessageQueue, ProcessQueue>> it = processQueueTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<MessageQueue, ProcessQueue> next = it.next();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
index c25e41b..a02f1b6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -17,13 +17,14 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Message lock,strictly ensure the single queue only one thread at a time consuming
  */
 public class MessageQueueLock {
-    private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
+    private ConcurrentMap<MessageQueue, Object> mqLockTable =
         new ConcurrentHashMap<MessageQueue, Object>();
 
     public Object fetchLockObject(final MessageQueue mq) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 304a44a..bbdf27d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -53,7 +54,7 @@ public class PullAPIWrapper {
     private final MQClientInstance mQClientFactory;
     private final String consumerGroup;
     private final boolean unitMode;
-    private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
+    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
     private volatile boolean connectBrokerByUser = false;
     private volatile long defaultBrokerId = MixAll.MASTER_ID;
@@ -247,7 +248,7 @@ public class PullAPIWrapper {
 
     private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
         throws MQClientException {
-        ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
+        ConcurrentMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
         if (topicRouteTable != null) {
             TopicRouteData topicRouteData = topicRouteTable.get(topic);
             List<String> list = topicRouteData.getFilterServerTable().get(brokerAddr);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 6b12221..634e0f0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -44,10 +45,10 @@ import org.slf4j.Logger;
  */
 public abstract class RebalanceImpl {
     protected static final Logger log = ClientLogger.getLog();
-    protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
-    protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
+    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
+    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
         new ConcurrentHashMap<String, Set<MessageQueue>>();
-    protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
+    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
         new ConcurrentHashMap<String, SubscriptionData>();
     protected String consumerGroup;
     protected MessageModel messageModel;
@@ -232,7 +233,7 @@ public abstract class RebalanceImpl {
         this.truncateMessageQueueNotMyTopic();
     }
 
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionInner() {
+    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
         return subscriptionInner;
     }
 
@@ -421,11 +422,11 @@ public abstract class RebalanceImpl {
         }
     }
 
-    public ConcurrentHashMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
+    public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
         return processQueueTable;
     }
 
-    public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
+    public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
         return topicSubscribeInfoTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 1b075ee..f146be9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -88,18 +89,18 @@ public class MQClientInstance {
     private final int instanceIndex;
     private final String clientId;
     private final long bootTimestamp = System.currentTimeMillis();
-    private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
-    private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
-    private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
+    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
+    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
+    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
     private final NettyClientConfig nettyClientConfig;
     private final MQClientAPIImpl mQClientAPIImpl;
     private final MQAdminImpl mQAdminImpl;
-    private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
+    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
     private final Lock lockNamesrv = new ReentrantLock();
     private final Lock lockHeartbeat = new ReentrantLock();
-    private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
+    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
         new ConcurrentHashMap<String, HashMap<Long, String>>();
-    private final ConcurrentHashMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
+    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
         new ConcurrentHashMap<String, HashMap<String, Integer>>();
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
@@ -1088,7 +1089,7 @@ public class MQClientInstance {
             }
             consumer.suspend();
 
-            ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
+            ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
             for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                 MessageQueue mq = entry.getKey();
                 if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
@@ -1166,7 +1167,7 @@ public class MQClientInstance {
         return defaultMQProducer;
     }
 
-    public ConcurrentHashMap<String, TopicRouteData> getTopicRouteTable() {
+    public ConcurrentMap<String, TopicRouteData> getTopicRouteTable() {
         return topicRouteTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index d828875..12f8a36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -26,6 +26,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -84,7 +85,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     private final Logger log = ClientLogger.getLog();
     private final Random random = new Random();
     private final DefaultMQProducer defaultMQProducer;
-    private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
+    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =
         new ConcurrentHashMap<String, TopicPublishInfo>();
     private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final RPCHook rpcHook;
@@ -1057,7 +1058,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
     }
 
-    public ConcurrentHashMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
+    public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
         return topicPublishInfoTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
index 7478dd2..3a0356c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerConnection.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.common.protocol.body;
 
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -27,7 +28,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class ConsumerConnection extends RemotingSerializable {
     private HashSet<Connection> connectionSet = new HashSet<Connection>();
-    private ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+    private ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
         new ConcurrentHashMap<String, SubscriptionData>();
     private ConsumeType consumeType;
     private MessageModel messageModel;
@@ -52,7 +53,7 @@ public class ConsumerConnection extends RemotingSerializable {
         this.connectionSet = connectionSet;
     }
 
-    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+    public ConcurrentMap<String, SubscriptionData> getSubscriptionTable() {
         return subscriptionTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
index 02bf811..5b08d78 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerOffsetSerializeWrapper.java
@@ -18,17 +18,18 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class ConsumerOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
-        new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
+        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
 
-    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
+    public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
+    public void setOffsetTable(ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
         this.offsetTable = offsetTable;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
index 92c15eb..e05f759 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/SubscriptionGroupWrapper.java
@@ -18,21 +18,22 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class SubscriptionGroupWrapper extends RemotingSerializable {
-    private ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+    private ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
         new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
     private DataVersion dataVersion = new DataVersion();
 
-    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+    public ConcurrentMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
         return subscriptionGroupTable;
     }
 
     public void setSubscriptionGroupTable(
-        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
+        ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupTable) {
         this.subscriptionGroupTable = subscriptionGroupTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
index c471d1a..ce12302 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/TopicConfigSerializeWrapper.java
@@ -18,20 +18,21 @@
 package org.apache.rocketmq.common.protocol.body;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.DataVersion;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TopicConfigSerializeWrapper extends RemotingSerializable {
-    private ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+    private ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>();
     private DataVersion dataVersion = new DataVersion();
 
-    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+    public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {
         return topicConfigTable;
     }
 
-    public void setTopicConfigTable(ConcurrentHashMap<String, TopicConfig> topicConfigTable) {
+    public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {
         this.topicConfigTable = topicConfigTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
index 5498d34..57dfc38 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
 import org.slf4j.Logger;
 
 public class MomentStatsItemSet {
-    private final ConcurrentHashMap<String/* key */, MomentStatsItem> statsItemTable =
+    private final ConcurrentMap<String/* key */, MomentStatsItem> statsItemTable =
         new ConcurrentHashMap<String, MomentStatsItem>(128);
     private final String statsName;
     private final ScheduledExecutorService scheduledExecutorService;
@@ -39,7 +40,7 @@ public class MomentStatsItemSet {
         this.init();
     }
 
-    public ConcurrentHashMap<String, MomentStatsItem> getStatsItemTable() {
+    public ConcurrentMap<String, MomentStatsItem> getStatsItemTable() {
         return statsItemTable;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index 8633d68..17dbf0d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -20,13 +20,14 @@ package org.apache.rocketmq.common.stats;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.UtilAll;
 import org.slf4j.Logger;
 
 public class StatsItemSet {
-    private final ConcurrentHashMap<String/* key */, StatsItem> statsItemTable =
+    private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
         new ConcurrentHashMap<String, StatsItem>(128);
 
     private final String statsName;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
----------------------------------------------------------------------
diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
index 2c31538..490c582 100644
--- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
+++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java
@@ -20,6 +20,7 @@ package org.apache.rocketmq.filtersrv.filter;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -40,7 +41,7 @@ public class FilterClassManager {
 
     private final ScheduledExecutorService scheduledExecutorService = Executors
         .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FSGetClassScheduledThread"));
-    private ConcurrentHashMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
+    private ConcurrentMap<String/* topic@consumerGroup */, FilterClassInfo> filterClassTable =
         new ConcurrentHashMap<String, FilterClassInfo>(128);
     private FilterClassFetchMethod filterClassFetchMethod;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 5a953a9..7479fcc 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.common.DataVersion;
@@ -135,7 +135,7 @@ public class RouteInfoManager {
                     && MixAll.MASTER_ID == brokerId) {
                     if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                         || registerFirst) {
-                        ConcurrentHashMap<String, TopicConfig> tcTable =
+                        ConcurrentMap<String, TopicConfig> tcTable =
                             topicConfigWrapper.getTopicConfigTable();
                         if (tcTable != null) {
                             for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 15586cb..0ba714a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -67,7 +68,7 @@ public abstract class NettyRemotingAbstract {
     /**
      * This map caches all on-going requests.
      */
-    protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
+    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
         new ConcurrentHashMap<Integer, ResponseFuture>(256);
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 52ca47e..1c3da9a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -41,6 +41,7 @@ import java.util.Random;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
@@ -73,7 +74,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private final Bootstrap bootstrap = new Bootstrap();
     private final EventLoopGroup eventLoopGroupWorker;
     private final Lock lockChannelTables = new ReentrantLock();
-    private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
 
     private final Timer timer = new Timer("ClientHouseKeepingService", true);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index 0993a5f..abb8385 100644
--- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -20,6 +20,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ServiceLoader;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class AllocateMappedFileService extends ServiceThread {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
     private static int waitTimeOut = 1000 * 5;
-    private ConcurrentHashMap<String, AllocateRequest> requestTable =
+    private ConcurrentMap<String, AllocateRequest> requestTable =
         new ConcurrentHashMap<String, AllocateRequest>();
     private PriorityBlockingQueue<AllocateRequest> requestQueue =
         new PriorityBlockingQueue<AllocateRequest>();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 931edc7..4549f1e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -64,7 +65,7 @@ public class DefaultMessageStore implements MessageStore {
     // CommitLog
     private final CommitLog commitLog;
 
-    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
+    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
 
     private final FlushConsumeQueueService flushConsumeQueueService;
 
@@ -140,9 +141,9 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public void truncateDirtyLogicFiles(long phyOffset) {
-        ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 
-        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 logic.truncateDirtyLogicFiles(phyOffset);
             }
@@ -267,7 +268,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public void destroyLogics() {
-        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 logic.destroy();
             }
@@ -885,13 +886,13 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public int cleanUnusedTopic(Set<String> topics) {
-        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
             String topic = next.getKey();
 
             if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
-                ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
+                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                 for (ConsumeQueue cq : queueTable.values()) {
                     cq.destroy();
                     log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
@@ -913,12 +914,12 @@ public class DefaultMessageStore implements MessageStore {
     public void cleanExpiredConsumerQueue() {
         long minCommitLogOffset = this.commitLog.getMinOffset();
 
-        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
             String topic = next.getKey();
             if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
-                ConcurrentHashMap<Integer, ConsumeQueue> queueTable = next.getValue();
+                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                 Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
                 while (itQT.hasNext()) {
                     Entry<Integer, ConsumeQueue> nextQT = itQT.next();
@@ -1061,10 +1062,10 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     public ConsumeQueue findConsumeQueue(String topic, int queueId) {
-        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
+        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
         if (null == map) {
-            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
-            ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
+            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
+            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
             if (oldMap != null) {
                 map = oldMap;
             } else {
@@ -1205,9 +1206,9 @@ public class DefaultMessageStore implements MessageStore {
     private void checkSelf() {
         this.commitLog.checkSelf();
 
-        Iterator<Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
+        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
         while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, ConsumeQueue>> next = it.next();
+            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
             Iterator<Entry<Integer, ConsumeQueue>> itNext = next.getValue().entrySet().iterator();
             while (itNext.hasNext()) {
                 Entry<Integer, ConsumeQueue> cq = itNext.next();
@@ -1280,7 +1281,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {
-        ConcurrentHashMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
+        ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);
         if (null == map) {
             map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();
             map.put(queueId, consumeQueue);
@@ -1291,7 +1292,7 @@ public class DefaultMessageStore implements MessageStore {
     }
 
     private void recoverConsumeQueue() {
-        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 logic.recover();
             }
@@ -1301,7 +1302,7 @@ public class DefaultMessageStore implements MessageStore {
     private void recoverTopicQueueTable() {
         HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
         long minPhyOffset = this.commitLog.getMinOffset();
-        for (ConcurrentHashMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
+        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
             for (ConsumeQueue logic : maps.values()) {
                 String key = logic.getTopic() + "-" + logic.getQueueId();
                 table.put(key, logic.getMaxOffsetInQueue());
@@ -1324,7 +1325,7 @@ public class DefaultMessageStore implements MessageStore {
         return runningFlags;
     }
 
-    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
+    public ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> getConsumeQueueTable() {
         return consumeQueueTable;
     }
 
@@ -1375,7 +1376,7 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public ConsumeQueue getConsumeQueue(String topic, int queueId) {
-        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
+        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
         if (map == null) {
             return null;
         }
@@ -1594,9 +1595,9 @@ public class DefaultMessageStore implements MessageStore {
             if (minOffset > this.lastPhysicalMinOffset) {
                 this.lastPhysicalMinOffset = minOffset;
 
-                ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 
-                for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+                for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                     for (ConsumeQueue logic : maps.values()) {
                         int deleteCount = logic.deleteExpiredFile(minOffset);
 
@@ -1639,9 +1640,9 @@ public class DefaultMessageStore implements MessageStore {
                 logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
             }
 
-            ConcurrentHashMap<String, ConcurrentHashMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
+            ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
 
-            for (ConcurrentHashMap<Integer, ConsumeQueue> maps : tables.values()) {
+            for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                 for (ConsumeQueue cq : maps.values()) {
                     boolean result = false;
                     for (int i = 0; i < retryTimes && !result; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
index efb6aa8..7021992 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/DelayOffsetSerializeWrapper.java
@@ -17,17 +17,18 @@
 package org.apache.rocketmq.store.schedule;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class DelayOffsetSerializeWrapper extends RemotingSerializable {
-    private ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+    private ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
         new ConcurrentHashMap<Integer, Long>(32);
 
-    public ConcurrentHashMap<Integer, Long> getOffsetTable() {
+    public ConcurrentMap<Integer, Long> getOffsetTable() {
         return offsetTable;
     }
 
-    public void setOffsetTable(ConcurrentHashMap<Integer, Long> offsetTable) {
+    public void setOffsetTable(ConcurrentMap<Integer, Long> offsetTable) {
         this.offsetTable = offsetTable;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/24d6eefb/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 501876e..172954d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.ConfigManager;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -49,10 +50,10 @@ public class ScheduleMessageService extends ConfigManager {
     private static final long DELAY_FOR_A_WHILE = 100L;
     private static final long DELAY_FOR_A_PERIOD = 10000L;
 
-    private final ConcurrentHashMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
+    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
         new ConcurrentHashMap<Integer, Long>(32);
 
-    private final ConcurrentHashMap<Integer /* level */, Long/* offset */> offsetTable =
+    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
         new ConcurrentHashMap<Integer, Long>(32);
 
     private final Timer timer = new Timer("ScheduleMessageTimerThread", true);


[02/10] incubator-rocketmq git commit: Add license for OpenMessaging

Posted by do...@apache.org.
Add license for OpenMessaging


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/3a9c342a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/3a9c342a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/3a9c342a

Branch: refs/heads/master
Commit: 3a9c342a2c1d31fe9a1cddf8cf6b670694aee1d7
Parents: cd744a5
Author: dongeforever <do...@apache.org>
Authored: Thu Jun 8 15:47:40 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 distribution/LICENSE-BIN | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/3a9c342a/distribution/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/distribution/LICENSE-BIN b/distribution/LICENSE-BIN
index 32d0208..3726172 100644
--- a/distribution/LICENSE-BIN
+++ b/distribution/LICENSE-BIN
@@ -314,4 +314,21 @@ The source code of guava can be found at https://github.com/google/guava.
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
+------
+This product has a bundle OpenMessaging, which is available under the ASL2 License.
+The source code of OpenMessaging can be found at https://github.com/openmessaging/openmessaging.
+
+ Copyright (C) 2017 The OpenMessaging authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
 


[04/10] incubator-rocketmq git commit: Include guava copyright announcement

Posted by do...@apache.org.
Include guava copyright announcement


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/609fc941
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/609fc941
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/609fc941

Branch: refs/heads/master
Commit: 609fc941d44b9bdbb4519ed613bd59705fdf3b97
Parents: 0c5e53d
Author: dongeforever <zh...@yeah.net>
Authored: Tue Jun 6 15:13:43 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 distribution/LICENSE-BIN | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/609fc941/distribution/LICENSE-BIN
----------------------------------------------------------------------
diff --git a/distribution/LICENSE-BIN b/distribution/LICENSE-BIN
index 9796b3c..32d0208 100644
--- a/distribution/LICENSE-BIN
+++ b/distribution/LICENSE-BIN
@@ -296,4 +296,22 @@ The source code of jna can be found at https://github.com/java-native-access/jna
 
  A copy is also included in the downloadable source code package
  containing JNA, in file "AL2.0", under the same directory
- as this file.
\ No newline at end of file
+ as this file.
+------
+This product has a bundle guava, which is available under the ASL2 License.
+The source code of guava can be found at https://github.com/google/guava.
+
+ Copyright (C) 2007 The Guava authors 
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+


[10/10] incubator-rocketmq git commit: [ROCKETMQ-218] Polish README.md, closes apache/incubator-rocketmq#113

Posted by do...@apache.org.
[ROCKETMQ-218] Polish README.md, closes apache/incubator-rocketmq#113


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/cd744a5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/cd744a5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/cd744a5d

Branch: refs/heads/master
Commit: cd744a5dd2f53f226bf491201f18413ea2f33b41
Parents: f4be3bb
Author: zhoudiqiu <zh...@wustl.edu>
Authored: Thu Jun 8 11:29:33 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 README.md | 32 +++++++++++++++++++-------------
 1 file changed, 19 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/cd744a5d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 7a9abb1..ab0a865 100644
--- a/README.md
+++ b/README.md
@@ -3,18 +3,25 @@
 [![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.org/apache/rocketmqreleases)
 [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
 
-**[Apache RocketMQ](https://rocketmq.incubator.apache.org) is a low latency, reliable, scalable, easy to use message oriented middleware born from alibaba massive messaging business.**
+**[Apache RocketMQ](https://rocketmq.incubator.apache.org) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.**
+
+It offers a variety of features:
+
+* Pub/Sub messaging model
+* Scheduled message delivery
+* Message retroactivity by time or offset
+* Log hub for streaming
+* Big data integration
+* Reliable FIFO and strict ordered messaging in the same queue
+* Efficient pull&push consumption model
+* Million-level message accumulation capacity in a single queue
+* Multiple messaging protocols like JMS and OpenMessaging
+* Flexible distributed scale-out deployment architecture
+* Lightning-fast batch message exchange system
+* Various message filter mechanics such as SQL and Tag
+* Docker images for isolated testing and cloud isolated clusters
+* Feature-rich administrative dashboard for configuration, metrics and monitoring
 
-It offers a variety of features as follows:
-
-* Pub/Sub and P2P messaging model
-* Reliable FIFO and strict sequential messaging in the same queue
-* Long pull queue model,also support push consumption style
-* Million message accumulation ability in single queue
-* Over a variety of messaging protocols.such as JMS,MQTT etc.
-* Distributed high available deploy architecture, meets at least once message delivery semantics
-* Docker images for isolated testing and cloud Isolated clusters
-* Feature-rich administrative dashboard for configuration,metrics and monitoring
 
 ----------
 
@@ -29,13 +36,12 @@ It offers a variety of features as follows:
 ----------
 
 ## Apache RocketMQ Community
-* [RocketMQ Community Incubator Projects](https://github.com/rocketmq)
 * [RocketMQ Community Projects](https://github.com/apache/incubator-rocketmq-externals)
 
 ----------
 
 ## Contributing
-We are always very happy to have contributions, whether for trivial cleanups,big new features or other material rewards. more details see [here](CONTRIBUTING.md) 
+We always welcome new contributions, whether for trivial cleanups, big new features or other material rewards. more details see [here](CONTRIBUTING.md) 
  
 ----------
 ## License


[06/10] incubator-rocketmq git commit: [ROCKETMQ-220] Add IT test for Filter By SQL 92, closes apache/incubator-rocketmq#114

Posted by do...@apache.org.
[ROCKETMQ-220] Add IT test for Filter By SQL 92, closes
apache/incubator-rocketmq#114


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/605103ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/605103ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/605103ef

Branch: refs/heads/master
Commit: 605103efd70cac652a0a65a447428c41459ffaa1
Parents: 982770b
Author: dongeforever <zh...@yeah.net>
Authored: Thu Jun 8 11:12:26 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerController.java       |  2 +-
 .../rocketmq/example/filter/SqlConsumer.java    |  1 -
 .../test/client/rmq/RMQSqlConsumer.java         | 42 +++++++++++
 .../rocketmq/test/factory/ConsumerFactory.java  | 12 ++++
 .../test/listener/AbstractListener.java         | 22 ++++++
 .../rocketmq/test/base/IntegrationTestBase.java |  1 +
 .../client/consumer/filter/SqlFilterIT.java     | 74 ++++++++++++++++++++
 7 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index bacd25c..1416ebf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -421,7 +421,7 @@ public class BrokerController {
 
         this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
index 9a3b813..52c2474 100644
--- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
@@ -31,7 +31,6 @@ public class SqlConsumer {
 
     public static void main(String[] args) {
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
-
         try {
             consumer.subscribe("TopicTest",
                 MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
new file mode 100644
index 0000000..cb0210f
--- /dev/null
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.rmq;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.listener.AbstractListener;
+
+public class RMQSqlConsumer extends RMQNormalConsumer {
+    private static Logger logger = Logger.getLogger(RMQSqlConsumer.class);
+    private MessageSelector selector;
+    public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector,
+        String consumerGroup, AbstractListener listener) {
+        super(nsAddr, topic, "*", consumerGroup, listener);
+        this.selector = selector;
+    }
+
+    @Override
+    public void create() {
+        super.create();
+        try {
+            consumer.subscribe(topic, selector);
+        } catch (Exception e) {
+            logger.error("Subscribe Sql Errored", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index b5b3fdd..7dd747f 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -17,8 +17,10 @@
 
 package org.apache.rocketmq.test.factory;
 
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
 import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
 import org.apache.rocketmq.test.listener.AbstractListener;
 
 public class ConsumerFactory {
@@ -42,4 +44,14 @@ public class ConsumerFactory {
         consumer.start();
         return consumer;
     }
+
+    public static RMQSqlConsumer getRMQSqlConsumer(String nsAddr, String consumerGroup,
+        String topic, MessageSelector selector,
+        AbstractListener listner) {
+        RMQSqlConsumer consumer = new RMQSqlConsumer(nsAddr, topic, selector,
+            consumerGroup, listner);
+        consumer.create();
+        consumer.start();
+        return consumer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
index 974434a..14da397 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -95,6 +95,28 @@ public class AbstractListener extends MQCollector implements MessageListener {
         return sendMsgs;
     }
 
+    public long waitForMessageConsume(int size,
+        int timeoutMills) {
+
+        long curTime = System.currentTimeMillis();
+        while (true) {
+            if (msgBodys.getDataSize() >= size) {
+                break;
+            }
+            if (System.currentTimeMillis() - curTime >= timeoutMills) {
+                logger.error(String.format("timeout but  [%s]  not recv all send messages!",
+                    listnerName));
+                break;
+            } else {
+                logger.info(String.format("[%s] still [%s] msg not recv!", listnerName,
+                    size - msgBodys.getDataSize()));
+                TestUtil.waitForMonment(500);
+            }
+        }
+
+        return msgBodys.getDataSize();
+    }
+
     public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) {
         Collection<Object> notRecvMsgs = waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
         for (Object object : notRecvMsgs) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
index 9805eba..07af7aa 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
@@ -127,6 +127,7 @@ public class IntegrationTestBase {
         brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
         brokerConfig.setBrokerIP1("127.0.0.1");
         brokerConfig.setNamesrvAddr(nsAddr);
+        brokerConfig.setEnablePropertyFilter(true);
         storeConfig.setStorePathRootDir(baseDir);
         storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
         storeConfig.setHaListenPort(8000 + random.nextInt(1000));

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/605103ef/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
new file mode 100644
index 0000000..7eef2ab
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.filter;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
+import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
+import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner;
+import org.apache.rocketmq.test.util.VerifyUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static com.google.common.truth.Truth.assertThat;
+
+public class SqlFilterIT extends BaseConf {
+    private static Logger logger = Logger.getLogger(SqlFilterIT.class);
+    private RMQNormalProducer producer = null;
+    private String topic = null;
+
+    @Before
+    public void setUp() {
+        topic = initTopic();
+        logger.info(String.format("use topic: %s;", topic));
+        producer = getProducer(nsAddr, topic);
+    }
+
+    @After
+    public void tearDown() {
+        super.shutDown();
+    }
+
+    @Test
+    public void testFilterConsumer() throws Exception {
+        int msgSize = 16;
+
+        String group = initConsumerGroup();
+        MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
+        RMQSqlConsumer consumer = ConsumerFactory.getRMQSqlConsumer(nsAddr, group, topic, selector, new RMQNormalListner(group + "_1"));
+        Thread.sleep(3000);
+        producer.send("TagA", msgSize);
+        producer.send("TagB", msgSize);
+        producer.send("TagC", msgSize);
+        Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
+        consumer.getListner().waitForMessageConsume(msgSize * 2, consumeTime);
+        assertThat(producer.getAllMsgBody())
+            .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
+            consumer.getListner().getAllMsgBody()));
+
+        assertThat(consumer.getListner().getAllMsgBody().size()).isEqualTo(msgSize * 2);
+    }
+}


[09/10] incubator-rocketmq git commit: [maven-release-plugin] prepare release rocketmq-all-4.1.0-incubating

Posted by do...@apache.org.
[maven-release-plugin] prepare release rocketmq-all-4.1.0-incubating


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ccb682d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ccb682d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ccb682d5

Branch: refs/heads/master
Commit: ccb682d57ab3dbe153a3889b7dbd06f9432a26bf
Parents: 3a9c342
Author: dongeforever <do...@apache.org>
Authored: Thu Jun 8 15:56:08 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 broker/pom.xml        | 2 +-
 client/pom.xml        | 2 +-
 common/pom.xml        | 2 +-
 distribution/pom.xml  | 6 ++----
 example/pom.xml       | 4 ++--
 filter/pom.xml        | 6 ++----
 filtersrv/pom.xml     | 2 +-
 logappender/pom.xml   | 5 ++---
 namesrv/pom.xml       | 2 +-
 openmessaging/pom.xml | 6 ++----
 pom.xml               | 4 ++--
 remoting/pom.xml      | 2 +-
 srvutil/pom.xml       | 2 +-
 store/pom.xml         | 2 +-
 test/pom.xml          | 2 +-
 tools/pom.xml         | 2 +-
 16 files changed, 22 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 0f8ad0a..0f4ae31 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 24d0a54..d97ba77 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 835ffa6..cfad0d3 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 41f36dc..20e9e6e 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -15,14 +15,12 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
     <artifactId>rocketmq-distribution</artifactId>
     <name>rocketmq-distribution ${project.version}</name>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/example/pom.xml
----------------------------------------------------------------------
diff --git a/example/pom.xml b/example/pom.xml
index 840fa36..260085e 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>
@@ -55,7 +55,7 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-openmessaging</artifactId>
-            <version>4.1.0-incubating-SNAPSHOT</version>
+            <version>4.1.0-incubating</version>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/filter/pom.xml
----------------------------------------------------------------------
diff --git a/filter/pom.xml b/filter/pom.xml
index 7978f05..b08ce2d 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -16,13 +16,11 @@
   ~ limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/filtersrv/pom.xml
----------------------------------------------------------------------
diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml
index 3963875..f41f3b7 100644
--- a/filtersrv/pom.xml
+++ b/filtersrv/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/logappender/pom.xml
----------------------------------------------------------------------
diff --git a/logappender/pom.xml b/logappender/pom.xml
index 5974c75..c18ace7 100644
--- a/logappender/pom.xml
+++ b/logappender/pom.xml
@@ -15,12 +15,11 @@
   See the License for the specific language governing permissions and
   limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
     <artifactId>rocketmq-logappender</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/namesrv/pom.xml
----------------------------------------------------------------------
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index f7d0935..9939e4f 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/openmessaging/pom.xml
----------------------------------------------------------------------
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index e853642..e857be9 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -16,13 +16,11 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 851a6cb..ea4af1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
     <inceptionYear>2012</inceptionYear>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-all</artifactId>
-    <version>4.1.0-incubating-SNAPSHOT</version>
+    <version>4.1.0-incubating</version>
     <packaging>pom</packaging>
     <name>Apache RocketMQ ${project.version}</name>
     <url>http://rocketmq.incubator.apache.org/</url>
@@ -43,7 +43,7 @@
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git
         </developerConnection>
-        <tag>HEAD</tag>
+        <tag>rocketmq-all-4.1.0-incubating</tag>
     </scm>
 
     <mailingLists>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/remoting/pom.xml
----------------------------------------------------------------------
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 1552341..569b917 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/srvutil/pom.xml
----------------------------------------------------------------------
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 6dc0377..53fee10 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/store/pom.xml
----------------------------------------------------------------------
diff --git a/store/pom.xml b/store/pom.xml
index b1b6e72..45223fd 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/test/pom.xml
----------------------------------------------------------------------
diff --git a/test/pom.xml b/test/pom.xml
index adb51fd..30dd2a8 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>rocketmq-all</artifactId>
         <groupId>org.apache.rocketmq</groupId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ccb682d5/tools/pom.xml
----------------------------------------------------------------------
diff --git a/tools/pom.xml b/tools/pom.xml
index d10194c..e3da25d 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
-        <version>4.1.0-incubating-SNAPSHOT</version>
+        <version>4.1.0-incubating</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>


[08/10] incubator-rocketmq git commit: [ROCKETMQ-219] Add batch example, closes apache/incubator-rocketmq#112

Posted by do...@apache.org.
[ROCKETMQ-219] Add batch example, closes apache/incubator-rocketmq#112


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f4be3bb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f4be3bb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f4be3bb9

Branch: refs/heads/master
Commit: f4be3bb929c99b9066759c9f02dc6d24045088cb
Parents: 605103e
Author: dongeforever <zh...@yeah.net>
Authored: Thu Jun 8 11:13:24 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 .../example/batch/SimpleBatchProducer.java      | 42 +++++++++
 .../example/batch/SplitBatchProducer.java       | 97 ++++++++++++++++++++
 2 files changed, 139 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f4be3bb9/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
new file mode 100644
index 0000000..a8609e7
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.example.batch;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class SimpleBatchProducer {
+
+
+    public static void main(String[] args) throws  Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        producer.start();
+
+        //If you just send messages of no more than 1MiB at a time, it is easy to use batch
+        //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
+        String topic = "BatchTest";
+        List<Message> messages = new ArrayList<>();
+        messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
+        messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
+        messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
+
+        producer.send(messages);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f4be3bb9/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
new file mode 100644
index 0000000..8809a11
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.example.batch;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class SplitBatchProducer {
+
+    public static void main(String[] args) throws  Exception {
+
+        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
+        producer.start();
+
+        //large batch
+        String topic = "BatchTest";
+        List<Message> messages = new ArrayList<>(100 * 1000);
+        for (int i = 0; i < 100 * 1000; i++) {
+            messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
+        }
+
+        //split the large batch into small ones:
+        ListSplitter splitter = new ListSplitter(messages);
+        while (splitter.hasNext()) {
+            List<Message>  listItem = splitter.next();
+            producer.send(listItem);
+        }
+    }
+
+}
+
+
+class ListSplitter implements Iterator<List<Message>> {
+    private int sizeLimit = 1000 * 1000;
+    private final List<Message> messages;
+    private int currIndex;
+    public ListSplitter(List<Message> messages) {
+        this.messages = messages;
+    }
+    @Override public boolean hasNext() {
+        return currIndex < messages.size();
+    }
+    @Override public List<Message> next() {
+        int nextIndex = currIndex;
+        int totalSize = 0;
+        for (; nextIndex < messages.size(); nextIndex++) {
+            Message message = messages.get(nextIndex);
+            int tmpSize = message.getTopic().length() + message.getBody().length;
+            Map<String, String> properties = message.getProperties();
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                tmpSize += entry.getKey().length() + entry.getValue().length();
+            }
+            tmpSize = tmpSize + 20; //for log overhead
+            if (tmpSize > sizeLimit) {
+                //it is unexpected that single message exceeds the sizeLimit
+                //here just let it go, otherwise it will block the splitting process
+                if (nextIndex - currIndex == 0) {
+                    //if the next sublist has no element, add this one and then break, otherwise just break
+                    nextIndex++;
+                }
+                break;
+            }
+            if (tmpSize + totalSize > sizeLimit) {
+                break;
+            } else {
+                totalSize += tmpSize;
+            }
+
+        }
+        List<Message> subList = messages.subList(currIndex, nextIndex);
+        currIndex = nextIndex;
+        return subList;
+    }
+
+    @Override public void remove() {
+        throw new UnsupportedOperationException("Not allowed to remove");
+    }
+}


[07/10] incubator-rocketmq git commit: Remove develops from pom

Posted by do...@apache.org.
Remove develops from pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6af3f788
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6af3f788
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6af3f788

Branch: refs/heads/master
Commit: 6af3f7883f02ebcf318746e75ed3cda2610e2a6c
Parents: 24d6eef
Author: dongeforever <zh...@yeah.net>
Authored: Tue Jun 6 16:07:41 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 pom.xml | 71 +++++-------------------------------------------------------
 1 file changed, 5 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6af3f788/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75dbf5b..851a6cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,8 +43,8 @@
         <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git</connection>
         <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-rocketmq.git
         </developerConnection>
-      <tag>HEAD</tag>
-  </scm>
+        <tag>HEAD</tag>
+    </scm>
 
     <mailingLists>
         <mailingList>
@@ -69,71 +69,10 @@
 
     <developers>
         <developer>
-            <id>vintagewang</id>
-            <name>vintagewang</name>
-            <roles>
-                <role>architect</role>
-                <role>committer</role>
-            </roles>
-            <email>vintagewang@apache.org</email>
-            <timezone>+8</timezone>
+            <id>Apache RocketMQ</id>
+            <name>Apache RocketMQ of ASF</name>
+            <url>https://rocketmq.apache.org/</url>
         </developer>
-        <developer>
-            <id>vongosling@apache.org</id>
-            <name>vongosling@apache.org</name>
-            <roles>
-                <role>architect</role>
-                <role>committer</role>
-            </roles>
-            <email>vongosling@apache.org</email>
-            <timezone>+8</timezone>
-        </developer>
-        <developer>
-            <id>yukon</id>
-            <name>Xinyu Zhou</name>
-            <email>yukon@@apache.org</email>
-            <roles>
-                <role>committer</role>
-            </roles>
-            <timezone>+8</timezone>
-        </developer>
-        <developer>
-            <id>stevenschew</id>
-            <name>Wei Zhou</name>
-            <email>stevenschew@@apache.org</email>
-            <roles>
-                <role>committer</role>
-            </roles>
-            <timezone>+8</timezone>
-        </developer>
-        <developer>
-            <id>lollipop</id>
-            <name>Jixiang Jin</name>
-            <email>lollipop@apache.org</email>
-            <roles>
-                <role>committer</role>
-            </roles>
-            <timezone>+8</timezone>
-        </developer>
-        <developer>
-            <id>lizhanhui</id>
-            <name>Zhanhui Li</name>
-            <email>lizhanhui@apache.org</email>
-            <roles>
-                <role>committer</role>
-            </roles>
-            <timezone>+8</timezone>
-        </developer>
-        <developer>
-            <id>dongeforever</id>
-            <name>dongeforever</name>
-            <email>dongeforever@apache.org</email>
-            <roles>
-                <role>committer</role>
-            </roles>
-            <timezone>+8</timezone>
-        </developer>
-
     </developers>
 
     <licenses>


[03/10] incubator-rocketmq git commit: Release zip too

Posted by do...@apache.org.
Release zip too


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/982770b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/982770b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/982770b1

Branch: refs/heads/master
Commit: 982770b1b2c49f37508c21122844a3acbc8cd548
Parents: 6af3f78
Author: dongeforever <do...@apache.org>
Authored: Tue Jun 6 17:02:15 2017 +0800
Committer: dongeforever <do...@apache.org>
Committed: Thu Jun 22 10:29:41 2017 +0800

----------------------------------------------------------------------
 distribution/release.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/982770b1/distribution/release.xml
----------------------------------------------------------------------
diff --git a/distribution/release.xml b/distribution/release.xml
index c67d23e..d87ad5d 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -21,6 +21,7 @@
     <formats>
         <format>dir</format>
         <format>tar.gz</format>
+        <format>zip</format>
     </formats>
     <fileSets>
         <fileSet>