You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:43 UTC

[26/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
new file mode 100644
index 0000000..6f861d3
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.ClientConfig;
+import com.alibaba.rocketmq.client.QueryResult;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.message.*;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultMQProducer extends ClientConfig implements MQProducer {
+    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
+    private String producerGroup;
+    /**
+     * Just for testing or demo program
+     */
+    private String createTopicKey = MixAll.DEFAULT_TOPIC;
+    private volatile int defaultTopicQueueNums = 4;
+    private int sendMsgTimeout = 3000;
+    private int compressMsgBodyOverHowmuch = 1024 * 4;
+    private int retryTimesWhenSendFailed = 2;
+    private int retryTimesWhenSendAsyncFailed = 2;
+
+    private boolean retryAnotherBrokerWhenNotStoreOK = false;
+    private int maxMessageSize = 1024 * 1024 * 4; // 4M
+    public DefaultMQProducer() {
+        this(MixAll.DEFAULT_PRODUCER_GROUP, null);
+    }
+
+
+    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
+        this.producerGroup = producerGroup;
+        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+    }
+
+
+    public DefaultMQProducer(final String producerGroup) {
+        this(producerGroup, null);
+    }
+
+
+    public DefaultMQProducer(RPCHook rpcHook) {
+        this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
+    }
+
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQProducerImpl.start();
+    }
+
+    @Override
+    public void shutdown() {
+        this.defaultMQProducerImpl.shutdown();
+    }
+
+
+    @Override
+    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
+        return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
+    }
+
+
+    @Override
+    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueue mq)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, mq);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueue mq, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, mq, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg, mq);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, selector, arg);
+    }
+
+
+    @Override
+    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
+    }
+
+
+    @Override
+    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
+    }
+
+
+    @Override
+    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
+            throws MQClientException, RemotingException, InterruptedException {
+        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
+    }
+
+
+    @Override
+    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
+            throws MQClientException {
+        throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
+        createTopic(key, newTopic, queueNum, 0);
+    }
+
+
+    @Override
+    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
+        this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
+    }
+
+
+    @Override
+    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
+        return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
+    }
+
+
+    @Override
+    public long maxOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.maxOffset(mq);
+    }
+
+
+    @Override
+    public long minOffset(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.minOffset(mq);
+    }
+
+
+    @Override
+    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
+        return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
+    }
+
+
+    @Override
+    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
+            throws MQClientException, InterruptedException {
+        return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
+    }
+
+
+    @Override
+    public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        try {
+            MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
+            return this.viewMessage(msgId);
+        } catch (Exception e) {
+        }
+        return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+
+    public String getCreateTopicKey() {
+        return createTopicKey;
+    }
+
+
+    public void setCreateTopicKey(String createTopicKey) {
+        this.createTopicKey = createTopicKey;
+    }
+
+
+    public int getSendMsgTimeout() {
+        return sendMsgTimeout;
+    }
+
+
+    public void setSendMsgTimeout(int sendMsgTimeout) {
+        this.sendMsgTimeout = sendMsgTimeout;
+    }
+
+
+    public int getCompressMsgBodyOverHowmuch() {
+        return compressMsgBodyOverHowmuch;
+    }
+
+
+    public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
+        this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+    }
+
+
+    public DefaultMQProducerImpl getDefaultMQProducerImpl() {
+        return defaultMQProducerImpl;
+    }
+
+
+    public boolean isRetryAnotherBrokerWhenNotStoreOK() {
+        return retryAnotherBrokerWhenNotStoreOK;
+    }
+
+
+    public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
+        this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+    }
+
+
+    public int getMaxMessageSize() {
+        return maxMessageSize;
+    }
+
+
+    public void setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
+    }
+
+
+    public int getDefaultTopicQueueNums() {
+        return defaultTopicQueueNums;
+    }
+
+
+    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+        this.defaultTopicQueueNums = defaultTopicQueueNums;
+    }
+
+
+    public int getRetryTimesWhenSendFailed() {
+        return retryTimesWhenSendFailed;
+    }
+
+
+    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
+        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+    }
+
+
+    public boolean isSendMessageWithVIPChannel() {
+        return isVipChannelEnabled();
+    }
+
+
+    public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
+        this.setVipChannelEnabled(sendMessageWithVIPChannel);
+    }
+
+
+    public long[] getNotAvailableDuration() {
+        return this.defaultMQProducerImpl.getNotAvailableDuration();
+    }
+
+    public void setNotAvailableDuration(final long[] notAvailableDuration) {
+        this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
+    }
+
+    public long[] getLatencyMax() {
+        return this.defaultMQProducerImpl.getLatencyMax();
+    }
+
+    public void setLatencyMax(final long[] latencyMax) {
+        this.defaultMQProducerImpl.setLatencyMax(latencyMax);
+    }
+
+    public boolean isSendLatencyFaultEnable() {
+        return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
+    }
+
+    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
+        this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
+    }
+
+    public int getRetryTimesWhenSendAsyncFailed() {
+        return retryTimesWhenSendAsyncFailed;
+    }
+
+    public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
+        this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
new file mode 100644
index 0000000..af3723a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.Message;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface LocalTransactionExecuter {
+    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
new file mode 100644
index 0000000..ee2a93a
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum LocalTransactionState {
+    COMMIT_MESSAGE,
+    ROLLBACK_MESSAGE,
+    UNKNOW,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
new file mode 100644
index 0000000..e21bc00
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.MQAdmin;
+import com.alibaba.rocketmq.client.exception.MQBrokerException;
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.remoting.exception.RemotingException;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQProducer extends MQAdmin {
+    void start() throws MQClientException;
+
+    void shutdown();
+
+
+    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
+
+
+    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    SendResult send(final Message msg, final long timeout) throws MQClientException,
+            RemotingException, MQBrokerException, InterruptedException;
+
+
+    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
+            RemotingException, InterruptedException;
+
+
+    void send(final Message msg, final SendCallback sendCallback, final long timeout)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void sendOneway(final Message msg) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
+            RemotingException, MQBrokerException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueue mq, final long timeout)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
+            RemotingException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
+            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+
+    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
+                    final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+            InterruptedException;
+
+
+    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+              final SendCallback sendCallback) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
+              final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
+            InterruptedException;
+
+
+    void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
+            throws MQClientException, RemotingException, InterruptedException;
+
+
+    TransactionSendResult sendMessageInTransaction(final Message msg,
+                                                   final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
new file mode 100644
index 0000000..924c145
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface MessageQueueSelector {
+    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
new file mode 100644
index 0000000..35d1a72
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public interface SendCallback {
+    public void onSuccess(final SendResult sendResult);
+
+
+    public void onException(final Throwable e);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
new file mode 100644
index 0000000..183accf
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendResult {
+    private SendStatus sendStatus;
+    private String msgId;
+    private MessageQueue messageQueue;
+    private long queueOffset;
+    private String transactionId;
+    private String offsetMsgId;
+    private String regionId;
+    private boolean traceOn = true;
+
+    public SendResult() {
+    }
+
+    public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
+        this.sendStatus = sendStatus;
+        this.msgId = msgId;
+        this.offsetMsgId = offsetMsgId;
+        this.messageQueue = messageQueue;
+        this.queueOffset = queueOffset;
+    }
+
+    public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
+        this.sendStatus = sendStatus;
+        this.msgId = msgId;
+        this.messageQueue = messageQueue;
+        this.queueOffset = queueOffset;
+        this.transactionId = transactionId;
+        this.offsetMsgId = offsetMsgId;
+        this.regionId = regionId;
+    }
+
+    public boolean isTraceOn() {
+        return traceOn;
+    }
+
+    public void setTraceOn(final boolean traceOn) {
+        this.traceOn = traceOn;
+    }
+
+    public String getRegionId() {
+        return regionId;
+    }
+
+    public void setRegionId(final String regionId) {
+        this.regionId = regionId;
+    }
+
+    public static String encoderSendResultToJson(final Object obj) {
+        return JSON.toJSONString(obj);
+    }
+
+    public static SendResult decoderSendResultFromJson(String json) {
+        return JSON.parseObject(json, SendResult.class);
+    }
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+
+    public SendStatus getSendStatus() {
+        return sendStatus;
+    }
+
+
+    public void setSendStatus(SendStatus sendStatus) {
+        this.sendStatus = sendStatus;
+    }
+
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+
+    public void setMessageQueue(MessageQueue messageQueue) {
+        this.messageQueue = messageQueue;
+    }
+
+
+    public long getQueueOffset() {
+        return queueOffset;
+    }
+
+
+    public void setQueueOffset(long queueOffset) {
+        this.queueOffset = queueOffset;
+    }
+
+
+    public String getTransactionId() {
+        return transactionId;
+    }
+
+
+    public void setTransactionId(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public String getOffsetMsgId() {
+        return offsetMsgId;
+    }
+
+    public void setOffsetMsgId(String offsetMsgId) {
+        this.offsetMsgId = offsetMsgId;
+    }
+
+    @Override
+    public String toString() {
+        return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
+                + ", queueOffset=" + queueOffset + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
new file mode 100644
index 0000000..3bc572f
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public enum SendStatus {
+    SEND_OK,
+    FLUSH_DISK_TIMEOUT,
+    FLUSH_SLAVE_TIMEOUT,
+    SLAVE_NOT_AVAILABLE,
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
new file mode 100644
index 0000000..8440537
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.common.message.MessageExt;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface TransactionCheckListener {
+    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
new file mode 100644
index 0000000..08dd4ab
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+import com.alibaba.rocketmq.client.exception.MQClientException;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.remoting.RPCHook;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionMQProducer extends DefaultMQProducer {
+    private TransactionCheckListener transactionCheckListener;
+    private int checkThreadPoolMinSize = 1;
+    private int checkThreadPoolMaxSize = 1;
+    private int checkRequestHoldMax = 2000;
+
+
+    public TransactionMQProducer() {
+    }
+
+
+    public TransactionMQProducer(final String producerGroup) {
+        super(producerGroup);
+    }
+
+    public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
+        super(producerGroup, rpcHook);
+    }
+
+    @Override
+    public void start() throws MQClientException {
+        this.defaultMQProducerImpl.initTransactionEnv();
+        super.start();
+    }
+
+
+    @Override
+    public void shutdown() {
+        super.shutdown();
+        this.defaultMQProducerImpl.destroyTransactionEnv();
+    }
+
+
+    @Override
+    public TransactionSendResult sendMessageInTransaction(final Message msg,
+                                                          final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
+        if (null == this.transactionCheckListener) {
+            throw new MQClientException("localTransactionBranchCheckListener is null", null);
+        }
+
+        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+    }
+
+
+    public TransactionCheckListener getTransactionCheckListener() {
+        return transactionCheckListener;
+    }
+
+
+    public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
+        this.transactionCheckListener = transactionCheckListener;
+    }
+
+
+    public int getCheckThreadPoolMinSize() {
+        return checkThreadPoolMinSize;
+    }
+
+
+    public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
+        this.checkThreadPoolMinSize = checkThreadPoolMinSize;
+    }
+
+
+    public int getCheckThreadPoolMaxSize() {
+        return checkThreadPoolMaxSize;
+    }
+
+
+    public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
+        this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+    }
+
+
+    public int getCheckRequestHoldMax() {
+        return checkRequestHoldMax;
+    }
+
+
+    public void setCheckRequestHoldMax(int checkRequestHoldMax) {
+        this.checkRequestHoldMax = checkRequestHoldMax;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
new file mode 100644
index 0000000..e7dcd0e
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer;
+
+/**
+ * @author shijia.wxr
+ */
+public class TransactionSendResult extends SendResult {
+    private LocalTransactionState localTransactionState;
+
+
+    public TransactionSendResult() {
+    }
+
+
+    public LocalTransactionState getLocalTransactionState() {
+        return localTransactionState;
+    }
+
+
+    public void setLocalTransactionState(LocalTransactionState localTransactionState) {
+        this.localTransactionState = localTransactionState;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
new file mode 100644
index 0000000..648356b
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByHash implements MessageQueueSelector {
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        int value = arg.hashCode();
+        if (value < 0) {
+            value = Math.abs(value);
+        }
+
+        value = value % mqs.size();
+        return mqs.get(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
new file mode 100644
index 0000000..a213391
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
+    private Set<String> consumeridcs;
+
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        return null;
+    }
+
+
+    public Set<String> getConsumeridcs() {
+        return consumeridcs;
+    }
+
+
+    public void setConsumeridcs(Set<String> consumeridcs) {
+        this.consumeridcs = consumeridcs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
new file mode 100644
index 0000000..3f381e4
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.client.producer.selector;
+
+import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
+import com.alibaba.rocketmq.common.message.Message;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SelectMessageQueueByRandoom implements MessageQueueSelector {
+    private Random random = new Random(System.currentTimeMillis());
+
+
+    @Override
+    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+        int value = random.nextInt();
+        if (value < 0) {
+            value = Math.abs(value);
+        }
+
+        value = value % mqs.size();
+        return mqs.get(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
new file mode 100644
index 0000000..e07e233
--- /dev/null
+++ b/rocketmq-client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client.stat;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
+import com.alibaba.rocketmq.common.stats.StatsItemSet;
+import com.alibaba.rocketmq.common.stats.StatsSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+
+public class ConsumerStatsManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
+
+    private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
+    private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
+    private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
+    private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
+    private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
+
+    private final StatsItemSet topicAndGroupConsumeOKTPS;
+    private final StatsItemSet topicAndGroupConsumeRT;
+    private final StatsItemSet topicAndGroupConsumeFailedTPS;
+    private final StatsItemSet topicAndGroupPullTPS;
+    private final StatsItemSet topicAndGroupPullRT;
+
+
+    public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
+        this.topicAndGroupConsumeOKTPS =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupConsumeRT =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
+
+        this.topicAndGroupConsumeFailedTPS =
+                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
+
+        this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
+    }
+
+
+    public void start() {
+    }
+
+
+    public void shutdown() {
+    }
+
+
+    public void incPullRT(final String group, final String topic, final long rt) {
+        this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
+    }
+
+
+    public void incPullTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+
+    public void incConsumeRT(final String group, final String topic, final long rt) {
+        this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
+    }
+
+
+    public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+
+    public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
+        this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
+    }
+
+    public ConsumeStatus consumeStatus(final String group, final String topic) {
+        ConsumeStatus cs = new ConsumeStatus();
+        {
+            StatsSnapshot ss = this.getPullRT(group, topic);
+            if (ss != null) {
+                cs.setPullRT(ss.getAvgpt());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getPullTPS(group, topic);
+            if (ss != null) {
+                cs.setPullTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeRT(group, topic);
+            if (ss != null) {
+                cs.setConsumeRT(ss.getAvgpt());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
+            if (ss != null) {
+                cs.setConsumeOKTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
+            if (ss != null) {
+                cs.setConsumeFailedTPS(ss.getTps());
+            }
+        }
+
+        {
+            StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
+            if (ss != null) {
+                cs.setConsumeFailedMsgs(ss.getSum());
+            }
+        }
+
+        return cs;
+    }
+
+    private StatsSnapshot getPullRT(final String group, final String topic) {
+        return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getPullTPS(final String group, final String topic) {
+        return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getConsumeRT(final String group, final String topic) {
+        StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
+        if (0 == statsData.getSum()) {
+            statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
+        }
+
+        return statsData;
+    }
+
+    private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
+        return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+
+    private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
+        return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
new file mode 100644
index 0000000..bf4b885
--- /dev/null
+++ b/rocketmq-client/src/main/resources/log4j_rocketmq_client.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="STDOUT-APPENDER" class="org.apache.log4j.ConsoleAppender">
+        <param name="encoding" value="UTF-8"/>
+        <param name="target" value="System.out"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%-5p %defaultTopic{2} , %m%n"/>
+        </layout>
+    </appender>
+
+    <appender name="RocketmqClientAppender" class="org.apache.log4j.RollingFileAppender">
+        <param name="file" value="${client.logRoot}/rocketmq_client.log"/>
+        <param name="append" value="true"/>
+        <param name="encoding" value="UTF-8"/>
+        <param name="maxFileSize" value="1073741824"/>
+        <param name="maxBackupIndex" value="${client.logFileMaxIndex}"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%defaultTopicQueueNums{yyy-MM-dd HH\:mm\:ss,SSS} %p %defaultTopic{1}(%L) - %m%n"/>
+        </layout>
+    </appender>
+
+    <logger name="RocketmqClient" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+    <logger name="RocketmqCommon" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+    <logger name="RocketmqRemoting" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+</log4j:configuration>
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/main/resources/logback_rocketmq_client.xml b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
new file mode 100644
index 0000000..a845ee4
--- /dev/null
+++ b/rocketmq-client/src/main/resources/logback_rocketmq_client.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<configuration>
+    <appender name="RocketmqClientAppender"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${client.logRoot}/rocketmq_client.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${client.logRoot}/otherdays/rocketmq_client.%properties.log
+            </fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>${client.logFileMaxIndex}</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+                class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%defaultTopicQueueNums{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <logger name="RocketmqCommon" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+
+    <logger name="RocketmqRemoting" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+
+    <logger name="RocketmqClient" additivity="false">
+        <level value="${client.logLevel}"/>
+        <appender-ref ref="RocketmqClientAppender"/>
+    </logger>
+
+
+</configuration>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
new file mode 100644
index 0000000..6dadafb
--- /dev/null
+++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/ValidatorsTest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.client;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class ValidatorsTest {
+
+    @Test
+    public void topicValidatorTest() {
+        try {
+            Validators.checkTopic("Hello");
+            Validators.checkTopic("%RETRY%Hello");
+            Validators.checkTopic("_%RETRY%Hello");
+            Validators.checkTopic("-%RETRY%Hello");
+            Validators.checkTopic("223-%RETRY%Hello");
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.assertTrue(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
----------------------------------------------------------------------
diff --git a/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
new file mode 100644
index 0000000..5ef75ed
--- /dev/null
+++ b/rocketmq-client/src/test/java/com/alibaba/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+/*
+ * @author yubao.fyb@taoboa.com
+ * @version $id$
+ */
+package com.alibaba.rocketmq.client.consumer.loadbalance;
+
+import com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import com.alibaba.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
+ */
+public class AllocateMessageQueueAveragelyTest {
+    private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
+    private String currentCID;
+    private String topic;
+    private List<MessageQueue> messageQueueList;
+    private List<String> consumerIdList;
+
+    @Before
+    public void init() {
+        allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+        topic = "topic_test";
+    }
+
+    @Test
+    public void testConsumer1() {
+        currentCID = "0";
+        createConsumerIdList(1);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer1");
+        Assert.assertEquals(result.size(), 5);
+        Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
+    }
+
+    public void createConsumerIdList(int size) {
+        consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(String.valueOf(i));
+        }
+    }
+
+    public void createMessageQueueList(int size) {
+        messageQueueList = new ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            messageQueueList.add(mq);
+        }
+    }
+
+    public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+        if (messageQueueList == null || messageQueueList.size() < 1)
+            return;
+        System.out.println(name + ".......................................start");
+        for (MessageQueue messageQueue : messageQueueList) {
+            System.out.println(messageQueue);
+        }
+        System.out.println(name + ".......................................end");
+    }
+
+    public List<MessageQueue> getMessageQueueList() {
+        return messageQueueList;
+    }
+
+    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
+        this.messageQueueList = messageQueueList;
+    }
+
+    @Test
+    public void testConsumer2() {
+        currentCID = "1";
+        createConsumerIdList(2);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer2");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+
+    }
+
+    @Test
+    public void testConsumer3CurrentCID0() {
+        currentCID = "0";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID0");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
+    }
+
+    @Test
+    public void testConsumer3CurrentCID1() {
+        currentCID = "1";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID1");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer3CurrentCID2() {
+        currentCID = "2";
+        createConsumerIdList(3);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer3CurrentCID2");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
+    }
+
+    @Test
+    public void testConsumer4() {
+        currentCID = "1";
+        createConsumerIdList(4);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer4");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer5() {
+        currentCID = "1";
+        createConsumerIdList(5);
+        createMessageQueueList(5);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer5");
+        Assert.assertEquals(result.size(), 1);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
+    }
+
+    @Test
+    public void testConsumer6() {
+        currentCID = "1";
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testConsumer");
+        Assert.assertEquals(result.size(), 3);
+        Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
+    }
+
+    @Test
+    public void testCurrentCIDNotExists() {
+        currentCID = String.valueOf(Integer.MAX_VALUE);
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        List<MessageQueue> result =
+                allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testCurrentCIDNotExists");
+        Assert.assertEquals(result.size(), 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCurrentCIDIllegalArgument() {
+        createConsumerIdList(2);
+        createMessageQueueList(6);
+        allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
+    }
+
+    public List<String> getConsumerIdList() {
+        return consumerIdList;
+    }
+
+    public void setConsumerIdList(List<String> consumerIdList) {
+        this.consumerIdList = consumerIdList;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMessageQueueIllegalArgument() {
+        currentCID = "0";
+        createConsumerIdList(2);
+        allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumerIdIllegalArgument() {
+        currentCID = "0";
+        createMessageQueueList(6);
+        allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
+    }
+
+    @Test
+    public void testAllocate() {
+        AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
+        String topic = "topic_test";
+        String currentCID = "CID";
+        int queueSize = 19;
+        int consumerSize = 10;
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i = 0; i < queueSize; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            mqAll.add(mq);
+        }
+
+        List<String> cidAll = new ArrayList<String>();
+        for (int j = 0; j < consumerSize; j++) {
+            cidAll.add("CID" + j);
+        }
+        System.out.println(mqAll.toString());
+        System.out.println(cidAll.toString());
+        for (int i = 0; i < consumerSize; i++) {
+            List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
+            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+        }
+    }
+
+
+    @Test
+    public void testAllocateByCircle() {
+        AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
+        String topic = "topic_test";
+        String currentCID = "CID";
+        int consumerSize = 3;
+        int queueSize = 13;
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i = 0; i < queueSize; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            mqAll.add(mq);
+        }
+
+        List<String> cidAll = new ArrayList<String>();
+        for (int j = 0; j < consumerSize; j++) {
+            cidAll.add("CID" + j);
+        }
+        System.out.println(mqAll.toString());
+        System.out.println(cidAll.toString());
+        for (int i = 0; i < consumerSize; i++) {
+            List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
+            System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-common/pom.xml b/rocketmq-common/pom.xml
new file mode 100644
index 0000000..72cc2b0
--- /dev/null
+++ b/rocketmq-common/pom.xml
@@ -0,0 +1,43 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain producerGroup copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.alibaba.rocketmq</groupId>
+        <artifactId>rocketmq-all</artifactId>
+        <version>4.0.0-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <packaging>jar</packaging>
+    <artifactId>rocketmq-common</artifactId>
+    <name>rocketmq-common ${project.version}</name>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
new file mode 100644
index 0000000..6eae0a7
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerConfig {
+    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+    @ImportantField
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+    @ImportantField
+    private String brokerIP1 = RemotingUtil.getLocalAddress();
+    private String brokerIP2 = RemotingUtil.getLocalAddress();
+    @ImportantField
+    private String brokerName = localHostName();
+    @ImportantField
+    private String brokerClusterName = "DefaultCluster";
+    @ImportantField
+    private long brokerId = MixAll.MASTER_ID;
+    private int brokerPermission = PermName.PERM_READ | PermName.PERM_WRITE;
+    private int defaultTopicQueueNums = 8;
+    @ImportantField
+    private boolean autoCreateTopicEnable = true;
+
+    private boolean clusterTopicEnable = true;
+
+    private boolean brokerTopicEnable = true;
+    @ImportantField
+    private boolean autoCreateSubscriptionGroup = true;
+    private String messageStorePlugIn = "";
+
+    private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
+    private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+    private int adminBrokerThreadPoolNums = 16;
+    private int clientManageThreadPoolNums = 32;
+    private int consumerManageThreadPoolNums = 32;
+
+    private int flushConsumerOffsetInterval = 1000 * 5;
+
+    private int flushConsumerOffsetHistoryInterval = 1000 * 60;
+
+    @ImportantField
+    private boolean rejectTransactionMessage = false;
+    @ImportantField
+    private boolean fetchNamesrvAddrByAddressServer = false;
+    private int sendThreadPoolQueueCapacity = 10000;
+    private int pullThreadPoolQueueCapacity = 100000;
+    private int clientManagerThreadPoolQueueCapacity = 1000000;
+    private int consumerManagerThreadPoolQueueCapacity = 1000000;
+
+    private int filterServerNums = 0;
+
+    private boolean longPollingEnable = true;
+
+    private long shortPollingTimeMills = 1000;
+
+    private boolean notifyConsumerIdsChangedEnable = true;
+
+    private boolean highSpeedMode = false;
+
+    private boolean commercialEnable = true;
+    private int commercialTimerCount = 1;
+    private int commercialTransCount = 1;
+    private int commercialBigCount = 1;
+
+    private boolean transferMsgByHeap = true;
+    private int maxDelayTime = 40;
+
+
+    private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
+    private int registerBrokerTimeoutMills = 6000;
+
+    private boolean slaveReadEnable = false;
+
+    private boolean disableConsumeIfConsumerReadSlowly = false;
+    private long consumerFallbehindThreshold = 1024 * 1024 * 1024 * 16;
+
+    private long waitTimeMillsInSendQueue = 200;
+
+    private long startAcceptSendRequestTimeStamp = 0L;
+
+    private boolean traceOn = true;
+
+    public boolean isTraceOn() {
+        return traceOn;
+    }
+
+    public void setTraceOn(final boolean traceOn) {
+        this.traceOn = traceOn;
+    }
+
+    public long getStartAcceptSendRequestTimeStamp() {
+        return startAcceptSendRequestTimeStamp;
+    }
+
+    public void setStartAcceptSendRequestTimeStamp(final long startAcceptSendRequestTimeStamp) {
+        this.startAcceptSendRequestTimeStamp = startAcceptSendRequestTimeStamp;
+    }
+
+    public long getWaitTimeMillsInSendQueue() {
+        return waitTimeMillsInSendQueue;
+    }
+
+    public void setWaitTimeMillsInSendQueue(final long waitTimeMillsInSendQueue) {
+        this.waitTimeMillsInSendQueue = waitTimeMillsInSendQueue;
+    }
+
+    public long getConsumerFallbehindThreshold() {
+        return consumerFallbehindThreshold;
+    }
+
+    public void setConsumerFallbehindThreshold(final long consumerFallbehindThreshold) {
+        this.consumerFallbehindThreshold = consumerFallbehindThreshold;
+    }
+
+    public boolean isDisableConsumeIfConsumerReadSlowly() {
+        return disableConsumeIfConsumerReadSlowly;
+    }
+
+    public void setDisableConsumeIfConsumerReadSlowly(final boolean disableConsumeIfConsumerReadSlowly) {
+        this.disableConsumeIfConsumerReadSlowly = disableConsumeIfConsumerReadSlowly;
+    }
+
+    public boolean isSlaveReadEnable() {
+        return slaveReadEnable;
+    }
+
+    public void setSlaveReadEnable(final boolean slaveReadEnable) {
+        this.slaveReadEnable = slaveReadEnable;
+    }
+
+    public static String localHostName() {
+        try {
+            return InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+
+        return "DEFAULT_BROKER";
+    }
+
+    public int getRegisterBrokerTimeoutMills() {
+        return registerBrokerTimeoutMills;
+    }
+
+    public void setRegisterBrokerTimeoutMills(final int registerBrokerTimeoutMills) {
+        this.registerBrokerTimeoutMills = registerBrokerTimeoutMills;
+    }
+
+    public String getRegionId() {
+        return regionId;
+    }
+
+    public void setRegionId(final String regionId) {
+        this.regionId = regionId;
+    }
+
+    public boolean isTransferMsgByHeap() {
+        return transferMsgByHeap;
+    }
+
+    public void setTransferMsgByHeap(final boolean transferMsgByHeap) {
+        this.transferMsgByHeap = transferMsgByHeap;
+    }
+
+    public String getMessageStorePlugIn() {
+        return messageStorePlugIn;
+    }
+
+    public void setMessageStorePlugIn(String messageStorePlugIn) {
+        this.messageStorePlugIn = messageStorePlugIn;
+    }
+
+    public boolean isHighSpeedMode() {
+        return highSpeedMode;
+    }
+
+
+    public void setHighSpeedMode(final boolean highSpeedMode) {
+        this.highSpeedMode = highSpeedMode;
+    }
+
+
+    public String getRocketmqHome() {
+        return rocketmqHome;
+    }
+
+
+    public void setRocketmqHome(String rocketmqHome) {
+        this.rocketmqHome = rocketmqHome;
+    }
+
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+
+    public int getBrokerPermission() {
+        return brokerPermission;
+    }
+
+
+    public void setBrokerPermission(int brokerPermission) {
+        this.brokerPermission = brokerPermission;
+    }
+
+
+    public int getDefaultTopicQueueNums() {
+        return defaultTopicQueueNums;
+    }
+
+
+    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
+        this.defaultTopicQueueNums = defaultTopicQueueNums;
+    }
+
+
+    public boolean isAutoCreateTopicEnable() {
+        return autoCreateTopicEnable;
+    }
+
+
+    public void setAutoCreateTopicEnable(boolean autoCreateTopic) {
+        this.autoCreateTopicEnable = autoCreateTopic;
+    }
+
+
+    public String getBrokerClusterName() {
+        return brokerClusterName;
+    }
+
+
+    public void setBrokerClusterName(String brokerClusterName) {
+        this.brokerClusterName = brokerClusterName;
+    }
+
+
+    public String getBrokerIP1() {
+        return brokerIP1;
+    }
+
+
+    public void setBrokerIP1(String brokerIP1) {
+        this.brokerIP1 = brokerIP1;
+    }
+
+
+    public String getBrokerIP2() {
+        return brokerIP2;
+    }
+
+
+    public void setBrokerIP2(String brokerIP2) {
+        this.brokerIP2 = brokerIP2;
+    }
+
+    public int getSendMessageThreadPoolNums() {
+        return sendMessageThreadPoolNums;
+    }
+
+    public void setSendMessageThreadPoolNums(int sendMessageThreadPoolNums) {
+        this.sendMessageThreadPoolNums = sendMessageThreadPoolNums;
+    }
+
+
+    public int getPullMessageThreadPoolNums() {
+        return pullMessageThreadPoolNums;
+    }
+
+
+    public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) {
+        this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
+    }
+
+
+    public int getAdminBrokerThreadPoolNums() {
+        return adminBrokerThreadPoolNums;
+    }
+
+
+    public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) {
+        this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums;
+    }
+
+
+    public int getFlushConsumerOffsetInterval() {
+        return flushConsumerOffsetInterval;
+    }
+
+
+    public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) {
+        this.flushConsumerOffsetInterval = flushConsumerOffsetInterval;
+    }
+
+
+    public int getFlushConsumerOffsetHistoryInterval() {
+        return flushConsumerOffsetHistoryInterval;
+    }
+
+
+    public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) {
+        this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval;
+    }
+
+
+    public boolean isClusterTopicEnable() {
+        return clusterTopicEnable;
+    }
+
+
+    public void setClusterTopicEnable(boolean clusterTopicEnable) {
+        this.clusterTopicEnable = clusterTopicEnable;
+    }
+
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+
+    public long getBrokerId() {
+        return brokerId;
+    }
+
+
+    public void setBrokerId(long brokerId) {
+        this.brokerId = brokerId;
+    }
+
+
+    public boolean isAutoCreateSubscriptionGroup() {
+        return autoCreateSubscriptionGroup;
+    }
+
+
+    public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) {
+        this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup;
+    }
+
+
+    public boolean isRejectTransactionMessage() {
+        return rejectTransactionMessage;
+    }
+
+
+    public void setRejectTransactionMessage(boolean rejectTransactionMessage) {
+        this.rejectTransactionMessage = rejectTransactionMessage;
+    }
+
+
+    public boolean isFetchNamesrvAddrByAddressServer() {
+        return fetchNamesrvAddrByAddressServer;
+    }
+
+
+    public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) {
+        this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer;
+    }
+
+
+    public int getSendThreadPoolQueueCapacity() {
+        return sendThreadPoolQueueCapacity;
+    }
+
+
+    public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) {
+        this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity;
+    }
+
+
+    public int getPullThreadPoolQueueCapacity() {
+        return pullThreadPoolQueueCapacity;
+    }
+
+
+    public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) {
+        this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
+    }
+
+
+    public boolean isBrokerTopicEnable() {
+        return brokerTopicEnable;
+    }
+
+
+    public void setBrokerTopicEnable(boolean brokerTopicEnable) {
+        this.brokerTopicEnable = brokerTopicEnable;
+    }
+
+
+    public int getFilterServerNums() {
+        return filterServerNums;
+    }
+
+
+    public void setFilterServerNums(int filterServerNums) {
+        this.filterServerNums = filterServerNums;
+    }
+
+
+    public boolean isLongPollingEnable() {
+        return longPollingEnable;
+    }
+
+
+    public void setLongPollingEnable(boolean longPollingEnable) {
+        this.longPollingEnable = longPollingEnable;
+    }
+
+
+    public boolean isNotifyConsumerIdsChangedEnable() {
+        return notifyConsumerIdsChangedEnable;
+    }
+
+
+    public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) {
+        this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable;
+    }
+
+
+    public long getShortPollingTimeMills() {
+        return shortPollingTimeMills;
+    }
+
+
+    public void setShortPollingTimeMills(long shortPollingTimeMills) {
+        this.shortPollingTimeMills = shortPollingTimeMills;
+    }
+
+
+    public int getClientManageThreadPoolNums() {
+        return clientManageThreadPoolNums;
+    }
+
+
+    public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) {
+        this.clientManageThreadPoolNums = clientManageThreadPoolNums;
+    }
+
+
+    public boolean isCommercialEnable() {
+        return commercialEnable;
+    }
+
+
+    public void setCommercialEnable(final boolean commercialEnable) {
+        this.commercialEnable = commercialEnable;
+    }
+
+    public int getCommercialTimerCount() {
+        return commercialTimerCount;
+    }
+
+    public void setCommercialTimerCount(final int commercialTimerCount) {
+        this.commercialTimerCount = commercialTimerCount;
+    }
+
+    public int getCommercialTransCount() {
+        return commercialTransCount;
+    }
+
+    public void setCommercialTransCount(final int commercialTransCount) {
+        this.commercialTransCount = commercialTransCount;
+    }
+
+    public int getCommercialBigCount() {
+        return commercialBigCount;
+    }
+
+    public void setCommercialBigCount(final int commercialBigCount) {
+        this.commercialBigCount = commercialBigCount;
+    }
+
+    public int getMaxDelayTime() {
+        return maxDelayTime;
+    }
+
+
+    public void setMaxDelayTime(final int maxDelayTime) {
+        this.maxDelayTime = maxDelayTime;
+    }
+
+    public int getClientManagerThreadPoolQueueCapacity() {
+        return clientManagerThreadPoolQueueCapacity;
+    }
+
+    public void setClientManagerThreadPoolQueueCapacity(int clientManagerThreadPoolQueueCapacity) {
+        this.clientManagerThreadPoolQueueCapacity = clientManagerThreadPoolQueueCapacity;
+    }
+
+    public int getConsumerManagerThreadPoolQueueCapacity() {
+        return consumerManagerThreadPoolQueueCapacity;
+    }
+
+    public void setConsumerManagerThreadPoolQueueCapacity(int consumerManagerThreadPoolQueueCapacity) {
+        this.consumerManagerThreadPoolQueueCapacity = consumerManagerThreadPoolQueueCapacity;
+    }
+
+    public int getConsumerManageThreadPoolNums() {
+        return consumerManageThreadPoolNums;
+    }
+
+    public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) {
+        this.consumerManageThreadPoolNums = consumerManageThreadPoolNums;
+    }
+}