You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:16 UTC

[20/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
new file mode 100644
index 0000000..070635a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.QueryResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.message.*;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducer extends ClientConfig implements MQProducer {
+    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
+    private String producerGroup;
+    /**
+     * Just for testing or demo program
+     */
+    private String createTopicKey = MixAll.DEFAULT_TOPIC;
+    private volatile int defaultTopicQueueNums = 4;
+    private int sendMsgTimeout = 3000;
+    private int compressMsgBodyOverHowmuch = 1024 * 4;
+    private int retryTimesWhenSendFailed = 2;
+    private int retryTimesWhenSendAsyncFailed = 2;
+
+    private boolean retryAnotherBrokerWhenNotStoreOK = false;
+    private int maxMessageSize = 1024 * 1024 * 4; // 4M
+    public DefaultMQProducer() {
+        this(MixAll.DEFAULT_PRODUCER_GROUP, null);
+    }
+
+
+    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
+        this.producerGroup = producerGroup;
+        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+    }
+
+
+    public DefaultMQProducer(final String producerGroup) {
+        this(producerGroup, null);
+    }
+
+
+    public DefaultMQProducer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
+    }
+
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQProducerImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.defaultMQProducerImpl.shutdown();
+    }
+
+
+    @Override
+    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+        return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
+    }
+
+
+    @Override
+    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueue mq)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, mq);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueue mq, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, mq, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg, mq);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, selector, arg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
+    }
+
+
+    @Override
+    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
+            throws MQClientException {
+        throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+        }
+        return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public String getCreateTopicKey() {
+        return createTopicKey;
+    }
+
+
+    public void setCreateTopicKey(String createTopicKey) {
+        this.createTopicKey = createTopicKey;
+    }
+
+
+    public int getSendMsgTimeout() {
+        return sendMsgTimeout;
+    }
+
+
+    public void setSendMsgTimeout(int sendMsgTimeout) {
+        this.sendMsgTimeout = sendMsgTimeout;
+    }
+
+
+    public int getCompressMsgBodyOverHowmuch() {
+        return compressMsgBodyOverHowmuch;
+    }
+
+
+    public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+        this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+    }
+
+
+    public DefaultMQProducerImpl getDefaultMQProducerImpl() {
+        return defaultMQProducerImpl;
+    }
+
+
+    public boolean isRetryAnotherBrokerWhenNotStoreOK() {
+        return retryAnotherBrokerWhenNotStoreOK;
+    }
+
+
+    public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
+        this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+    }
+
+
+    public int getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
+
+    public void setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+
+
+    public int getDefaultTopicQueueNums() {
+        return defaultTopicQueueNums;
+    }
+
+
+    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+        this.defaultTopicQueueNums = defaultTopicQueueNums;
+    }
+
+
+    public int getRetryTimesWhenSendFailed() {
+        return retryTimesWhenSendFailed;
+    }
+
+
+    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
+        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+    }
+
+
+    public boolean isSendMessageWithVIPChannel() {
+        return isVipChannelEnabled();
+    }
+
+
+    public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
+        this.setVipChannelEnabled(sendMessageWithVIPChannel);
+    }
+
+
+    public long[] getNotAvailableDuration() {
+        return this.defaultMQProducerImpl.getNotAvailableDuration();
+    }
+
+    public void setNotAvailableDuration(final long[] notAvailableDuration) {
+        this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
+    }
+
+    public long[] getLatencyMax() {
+        return this.defaultMQProducerImpl.getLatencyMax();
+    }
+
+    public void setLatencyMax(final long[] latencyMax) {
+        this.defaultMQProducerImpl.setLatencyMax(latencyMax);
+    }
+
+    public boolean isSendLatencyFaultEnable() {
+        return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
+    }
+
+    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+        this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+    }
+
+    public int getRetryTimesWhenSendAsyncFailed() {
+        return retryTimesWhenSendAsyncFailed;
+    }
+
+    public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
+        this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
new file mode 100644
index 0000000..5e8178a
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface LocalTransactionExecuter {
+    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
new file mode 100644
index 0000000..ce5b0d9
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum LocalTransactionState {
+    COMMIT_MESSAGE,
+    ROLLBACK_MESSAGE,
+    UNKNOW,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
new file mode 100644
index 0000000..0ea4a33
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.MQAdmin;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducer extends MQAdmin {
+    void start() throws MQClientException;
+
+    void shutdown();
+
+
+    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
+
+
+    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    SendResult send(final Message msg, final long timeout) throws MQClientException,
+            RemotingException, MQBrokerException, InterruptedException;
+
+
+    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
+            RemotingException, InterruptedException;
+
+
+    void send(final Message msg, final SendCallback sendCallback, final long timeout)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void sendOneway(final Message msg) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
+            RemotingException, MQBrokerException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueue mq, final long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
+            RemotingException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
+                    final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+              final SendCallback sendCallback) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+              final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    TransactionSendResult sendMessageInTransaction(final Message msg,
+                                                   final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
new file mode 100644
index 0000000..c7a9124
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MessageQueueSelector.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageQueueSelector {
+    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
new file mode 100644
index 0000000..7b0e00e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public interface SendCallback {
+    public void onSuccess(final SendResult sendResult);
+
+
+    public void onException(final Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
new file mode 100644
index 0000000..02ed6b5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendResult {
+    private SendStatus sendStatus;
+    private String msgId;
+    private MessageQueue messageQueue;
+    private long queueOffset;
+    private String transactionId;
+    private String offsetMsgId;
+    private String regionId;
+    private boolean traceOn = true;
+
+    public SendResult() {
+    }
+
+    public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
+        this.sendStatus = sendStatus;
+        this.msgId = msgId;
+        this.offsetMsgId = offsetMsgId;
+        this.messageQueue = messageQueue;
+        this.queueOffset = queueOffset;
+    }
+
+    public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
+        this.sendStatus = sendStatus;
+        this.msgId = msgId;
+        this.messageQueue = messageQueue;
+        this.queueOffset = queueOffset;
+        this.transactionId = transactionId;
+        this.offsetMsgId = offsetMsgId;
+        this.regionId = regionId;
+    }
+
+    public boolean isTraceOn() {
+        return traceOn;
+    }
+
+    public void setTraceOn(final boolean traceOn) {
+        this.traceOn = traceOn;
+    }
+
+    public String getRegionId() {
+        return regionId;
+    }
+
+    public void setRegionId(final String regionId) {
+        this.regionId = regionId;
+    }
+
+    public static String encoderSendResultToJson(final Object obj) {
+        return JSON.toJSONString(obj);
+    }
+
+    public static SendResult decoderSendResultFromJson(String json) {
+        return JSON.parseObject(json, SendResult.class);
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public SendStatus getSendStatus() {
+        return sendStatus;
+    }
+
+
+    public void setSendStatus(SendStatus sendStatus) {
+        this.sendStatus = sendStatus;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getQueueOffset() {
+        return queueOffset;
+    }
+
+
+    public void setQueueOffset(long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public String getOffsetMsgId() {
+        return offsetMsgId;
+    }
+
+    public void setOffsetMsgId(String offsetMsgId) {
+        this.offsetMsgId = offsetMsgId;
+    }
+
+    @Override
+    public String toString() {
+        return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
+                + ", queueOffset=" + queueOffset + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
new file mode 100644
index 0000000..038bc99
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum SendStatus {
+    SEND_OK,
+    FLUSH_DISK_TIMEOUT,
+    FLUSH_SLAVE_TIMEOUT,
+    SLAVE_NOT_AVAILABLE,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
new file mode 100644
index 0000000..9a11d50
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface TransactionCheckListener {
+    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
new file mode 100644
index 0000000..eaca6ec
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.RPCHook;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionMQProducer extends DefaultMQProducer {
+    private TransactionCheckListener transactionCheckListener;
+    private int checkThreadPoolMinSize = 1;
+    private int checkThreadPoolMaxSize = 1;
+    private int checkRequestHoldMax = 2000;
+
+
+    public TransactionMQProducer() {
+    }
+
+
+    public TransactionMQProducer(final String producerGroup) {
+        super(producerGroup);
+    }
+
+    public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
+        super(producerGroup, rpcHook);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQProducerImpl.initTransactionEnv();
+        super.start();
+    }
+
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        this.defaultMQProducerImpl.destroyTransactionEnv();
+    }
+
+
+    @Override
+    public TransactionSendResult sendMessageInTransaction(final Message msg,
+                                                          final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
+        if (null == this.transactionCheckListener) {
+            throw new MQClientException("localTransactionBranchCheckListener is null", null);
+        }
+
+        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+    }
+
+
+    public TransactionCheckListener getTransactionCheckListener() {
+        return transactionCheckListener;
+    }
+
+
+    public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
+        this.transactionCheckListener = transactionCheckListener;
+    }
+
+
+    public int getCheckThreadPoolMinSize() {
+        return checkThreadPoolMinSize;
+    }
+
+
+    public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
+        this.checkThreadPoolMinSize = checkThreadPoolMinSize;
+    }
+
+
+    public int getCheckThreadPoolMaxSize() {
+        return checkThreadPoolMaxSize;
+    }
+
+
+    public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
+        this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+    }
+
+
+    public int getCheckRequestHoldMax() {
+        return checkRequestHoldMax;
+    }
+
+
+    public void setCheckRequestHoldMax(int checkRequestHoldMax) {
+        this.checkRequestHoldMax = checkRequestHoldMax;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
new file mode 100644
index 0000000..478c39d
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionSendResult extends SendResult {
+    private LocalTransactionState localTransactionState;
+
+
+    public TransactionSendResult() {
+    }
+
+
+    public LocalTransactionState getLocalTransactionState() {
+        return localTransactionState;
+    }
+
+
+    public void setLocalTransactionState(LocalTransactionState localTransactionState) {
+        this.localTransactionState = localTransactionState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
new file mode 100644
index 0000000..0f6ce48
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByHash implements MessageQueueSelector {
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        int value = arg.hashCode();
+        if (value < 0) {
+            value = Math.abs(value);
+        }
+
+        value = value % mqs.size();
+        return mqs.get(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..1902de5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
+    private Set<String> consumeridcs;
+
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        return null;
+    }
+
+
+    public Set<String> getConsumeridcs() {
+        return consumeridcs;
+    }
+
+
+    public void setConsumeridcs(Set<String> consumeridcs) {
+        this.consumeridcs = consumeridcs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
new file mode 100644
index 0000000..b39b777
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByRandoom implements MessageQueueSelector {
+    private Random random = new Random(System.currentTimeMillis());
+
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        int value = random.nextInt();
+        if (value < 0) {
+            value = Math.abs(value);
+        }
+
+        value = value % mqs.size();
+        return mqs.get(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
new file mode 100644
index 0000000..3234ada
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client.stat;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
+import org.apache.rocketmq.common.stats.StatsItemSet;
+import org.apache.rocketmq.common.stats.StatsSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class ConsumerStatsManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+
+    private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
+    private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
+    private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
+    private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
+    private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
+
+    private final StatsItemSet topicAndGroupConsumeOKTPS;
+    private final StatsItemSet topicAndGroupConsumeRT;
+    private final StatsItemSet topicAndGroupConsumeFailedTPS;
+    private final StatsItemSet topicAndGroupPullTPS;
+    private final StatsItemSet topicAndGroupPullRT;
+
+
+    public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
+        this.topicAndGroupConsumeOKTPS =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupConsumeRT =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
+
+        this.topicAndGroupConsumeFailedTPS =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
+    }
+
+
+    public void start() {
+    }
+
+
+    public void shutdown() {
+    }
+
+
+    public void incPullRT(final String group, final String topic, final long rt) {
+        this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
+    }
+
+
+    public void incPullTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+
+    public void incConsumeRT(final String group, final String topic, final long rt) {
+        this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
+    }
+
+
+    public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+
+    public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+    public ConsumeStatus consumeStatus(final String group, final String topic) {
+        ConsumeStatus cs = new ConsumeStatus();
+        {
+            StatsSnapshot ss = this.getPullRT(group, topic);
+            if (ss != null) {
+                cs.setPullRT(ss.getAvgpt());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getPullTPS(group, topic);
+            if (ss != null) {
+                cs.setPullTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeRT(group, topic);
+            if (ss != null) {
+                cs.setConsumeRT(ss.getAvgpt());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
+            if (ss != null) {
+                cs.setConsumeOKTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
+            if (ss != null) {
+                cs.setConsumeFailedTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
+            if (ss != null) {
+                cs.setConsumeFailedMsgs(ss.getSum());
+            }
+        }
+
+        return cs;
+    }
+
+    private StatsSnapshot getPullRT(final String group, final String topic) {
+        return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getPullTPS(final String group, final String topic) {
+        return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getConsumeRT(final String group, final String topic) {
+        StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
+        if (0 == statsData.getSum()) {
+            statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
+        }
+
+        return statsData;
+    }
+
+    private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
+        return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
+        return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
deleted file mode 100644
index 2a10ec4..0000000
--- a/client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.client;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class ValidatorsTest {
-
-    @Test
-    public void topicValidatorTest() throws MQClientException {
-        Validators.checkTopic("Hello");
-        Validators.checkTopic("%RETRY%Hello");
-        Validators.checkTopic("_%RETRY%Hello");
-        Validators.checkTopic("-%RETRY%Hello");
-        Validators.checkTopic("223-%RETRY%Hello");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
deleted file mode 100644
index 5ef75ed..0000000
--- a/client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/*
- * @author yubao.fyb@taoboa.com
- * @version $id$
- */
-package com.alibaba.rocketmq.client.consumer.loadbalance;
-
-import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
-import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
- */
-public class AllocateMessageQueueAveragelyTest {
-    private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
-    private String currentCID;
-    private String topic;
-    private List<MessageQueue> messageQueueList;
-    private List<String> consumerIdList;
-
-    @Before
-    public void init() {
-        allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
-        topic = "topic_test";
-    }
-
-    @Test
-    public void testConsumer1() {
-        currentCID = "0";
-        createConsumerIdList(1);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer1");
-        Assert.assertEquals(result.size(), 5);
-        Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
-    }
-
-    public void createConsumerIdList(int size) {
-        consumerIdList = new ArrayList<String>(size);
-        for (int i = 0; i < size; i++) {
-            consumerIdList.add(String.valueOf(i));
-        }
-    }
-
-    public void createMessageQueueList(int size) {
-        messageQueueList = new ArrayList<MessageQueue>(size);
-        for (int i = 0; i < size; i++) {
-            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
-            messageQueueList.add(mq);
-        }
-    }
-
-    public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
-        if (messageQueueList == null || messageQueueList.size() < 1)
-            return;
-        System.out.println(name + ".......................................start");
-        for (MessageQueue messageQueue : messageQueueList) {
-            System.out.println(messageQueue);
-        }
-        System.out.println(name + ".......................................end");
-    }
-
-    public List<MessageQueue> getMessageQueueList() {
-        return messageQueueList;
-    }
-
-    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
-        this.messageQueueList = messageQueueList;
-    }
-
-    @Test
-    public void testConsumer2() {
-        currentCID = "1";
-        createConsumerIdList(2);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer2");
-        Assert.assertEquals(result.size(), 3);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
-
-    }
-
-    @Test
-    public void testConsumer3CurrentCID0() {
-        currentCID = "0";
-        createConsumerIdList(3);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer3CurrentCID0");
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
-    }
-
-    @Test
-    public void testConsumer3CurrentCID1() {
-        currentCID = "1";
-        createConsumerIdList(3);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer3CurrentCID1");
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
-    }
-
-    @Test
-    public void testConsumer3CurrentCID2() {
-        currentCID = "2";
-        createConsumerIdList(3);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer3CurrentCID2");
-        Assert.assertEquals(result.size(), 3);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
-    }
-
-    @Test
-    public void testConsumer4() {
-        currentCID = "1";
-        createConsumerIdList(4);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer4");
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
-    }
-
-    @Test
-    public void testConsumer5() {
-        currentCID = "1";
-        createConsumerIdList(5);
-        createMessageQueueList(5);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer5");
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
-    }
-
-    @Test
-    public void testConsumer6() {
-        currentCID = "1";
-        createConsumerIdList(2);
-        createMessageQueueList(6);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testConsumer");
-        Assert.assertEquals(result.size(), 3);
-        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
-    }
-
-    @Test
-    public void testCurrentCIDNotExists() {
-        currentCID = String.valueOf(Integer.MAX_VALUE);
-        createConsumerIdList(2);
-        createMessageQueueList(6);
-        List<MessageQueue> result =
-                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
-        printMessageQueue(result, "testCurrentCIDNotExists");
-        Assert.assertEquals(result.size(), 0);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testCurrentCIDIllegalArgument() {
-        createConsumerIdList(2);
-        createMessageQueueList(6);
-        allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
-    }
-
-    public List<String> getConsumerIdList() {
-        return consumerIdList;
-    }
-
-    public void setConsumerIdList(List<String> consumerIdList) {
-        this.consumerIdList = consumerIdList;
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testMessageQueueIllegalArgument() {
-        currentCID = "0";
-        createConsumerIdList(2);
-        allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testConsumerIdIllegalArgument() {
-        currentCID = "0";
-        createMessageQueueList(6);
-        allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
-    }
-
-    @Test
-    public void testAllocate() {
-        AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
-        String topic = "topic_test";
-        String currentCID = "CID";
-        int queueSize = 19;
-        int consumerSize = 10;
-        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
-        for (int i = 0; i < queueSize; i++) {
-            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
-            mqAll.add(mq);
-        }
-
-        List<String> cidAll = new ArrayList<String>();
-        for (int j = 0; j < consumerSize; j++) {
-            cidAll.add("CID" + j);
-        }
-        System.out.println(mqAll.toString());
-        System.out.println(cidAll.toString());
-        for (int i = 0; i < consumerSize; i++) {
-            List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
-            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
-        }
-    }
-
-
-    @Test
-    public void testAllocateByCircle() {
-        AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
-        String topic = "topic_test";
-        String currentCID = "CID";
-        int consumerSize = 3;
-        int queueSize = 13;
-        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
-        for (int i = 0; i < queueSize; i++) {
-            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
-            mqAll.add(mq);
-        }
-
-        List<String> cidAll = new ArrayList<String>();
-        for (int j = 0; j < consumerSize; j++) {
-            cidAll.add("CID" + j);
-        }
-        System.out.println(mqAll.toString());
-        System.out.println(cidAll.toString());
-        for (int i = 0; i < consumerSize; i++) {
-            List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
-            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
new file mode 100644
index 0000000..a3daba5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.junit.Test;
+
+
+public class ValidatorsTest {
+
+    @Test
+    public void topicValidatorTest() throws MQClientException {
+        Validators.checkTopic("Hello");
+        Validators.checkTopic("%RETRY%Hello");
+        Validators.checkTopic("_%RETRY%Hello");
+        Validators.checkTopic("-%RETRY%Hello");
+        Validators.checkTopic("223-%RETRY%Hello");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
new file mode 100644
index 0000000..7b568c5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/*
+ * @author yubao.fyb@taoboa.com
+ * @version $id$
+ */
+package org.apache.rocketmq.client.consumer.loadbalance;
+
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
+ */
+public class AllocateMessageQueueAveragelyTest {
+    private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
+    private String currentCID;
+    private String topic;
+    private List<MessageQueue> messageQueueList;
+    private List<String> consumerIdList;
+
+    @Before
+    public void init() {
+        allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+        topic = "topic_test";
+    }
+
+    @Test
+    public void testConsumer1() {
+        currentCID = "0";
+        createConsumerIdList(1);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer1");
+        Assert.assertEquals(result.size(), 5);
+        Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
+    }
+
+    public void createConsumerIdList(int size) {
+        consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(String.valueOf(i));
+        }
+    }
+
+    public void createMessageQueueList(int size) {
+        messageQueueList = new ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            messageQueueList.add(mq);
+        }
+    }
+
+    public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+        if (messageQueueList == null || messageQueueList.size() < 1)
+            return;
+        System.out.println(name + ".......................................start");
+        for (MessageQueue messageQueue : messageQueueList) {
+            System.out.println(messageQueue);
+        }
+        System.out.println(name + ".......................................end");
+    }
+
+    public List<MessageQueue> getMessageQueueList() {
+        return messageQueueList;
+    }
+
+    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+        this.messageQueueList = messageQueueList;
+    }
+
+    @Test
+    public void testConsumer2() {
+        currentCID = "1";
+        createConsumerIdList(2);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer2");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+
+    }
+
+    @Test
+    public void testConsumer3CurrentCID0() {
+        currentCID = "0";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID0");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
+    }
+
+    @Test
+    public void testConsumer3CurrentCID1() {
+        currentCID = "1";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID1");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer3CurrentCID2() {
+        currentCID = "2";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID2");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+    }
+
+    @Test
+    public void testConsumer4() {
+        currentCID = "1";
+        createConsumerIdList(4);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer4");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer5() {
+        currentCID = "1";
+        createConsumerIdList(5);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer5");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer6() {
+        currentCID = "1";
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
+    }
+
+    @Test
+    public void testCurrentCIDNotExists() {
+        currentCID = String.valueOf(Integer.MAX_VALUE);
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testCurrentCIDNotExists");
+        Assert.assertEquals(result.size(), 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCurrentCIDIllegalArgument() {
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
+    }
+
+    public List<String> getConsumerIdList() {
+        return consumerIdList;
+    }
+
+    public void setConsumerIdList(List<String> consumerIdList) {
+        this.consumerIdList = consumerIdList;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMessageQueueIllegalArgument() {
+        currentCID = "0";
+        createConsumerIdList(2);
+        allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumerIdIllegalArgument() {
+        currentCID = "0";
+        createMessageQueueList(6);
+        allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
+    }
+
+    @Test
+    public void testAllocate() {
+        AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+        String topic = "topic_test";
+        String currentCID = "CID";
+        int queueSize = 19;
+        int consumerSize = 10;
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i = 0; i < queueSize; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            mqAll.add(mq);
+        }
+
+        List<String> cidAll = new ArrayList<String>();
+        for (int j = 0; j < consumerSize; j++) {
+            cidAll.add("CID" + j);
+        }
+        System.out.println(mqAll.toString());
+        System.out.println(cidAll.toString());
+        for (int i = 0; i < consumerSize; i++) {
+            List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
+            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+        }
+    }
+
+
+    @Test
+    public void testAllocateByCircle() {
+        AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
+        String topic = "topic_test";
+        String currentCID = "CID";
+        int consumerSize = 3;
+        int queueSize = 13;
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i = 0; i < queueSize; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            mqAll.add(mq);
+        }
+
+        List<String> cidAll = new ArrayList<String>();
+        for (int j = 0; j < consumerSize; j++) {
+            cidAll.add("CID" + j);
+        }
+        System.out.println(mqAll.toString());
+        System.out.println(cidAll.toString());
+        for (int i = 0; i < consumerSize; i++) {
+            List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
+            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 72cc2b0..ec95a76 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -18,7 +18,7 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <groupId>com.alibaba.rocketmq</groupId>
+        <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
         <version>4.0.0-SNAPSHOT</version>
     </parent>