You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:21 UTC

[30/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
deleted file mode 100644
index 6f861d3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/DefaultMQProducer.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultMQProducer extends ClientConfig implements MQProducer {
-    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
-    private String producerGroup;
-    /**
-     * Just for testing or demo program
-     */
-    private String createTopicKey = MixAll.DEFAULT_TOPIC;
-    private volatile int defaultTopicQueueNums = 4;
-    private int sendMsgTimeout = 3000;
-    private int compressMsgBodyOverHowmuch = 1024 * 4;
-    private int retryTimesWhenSendFailed = 2;
-    private int retryTimesWhenSendAsyncFailed = 2;
-
-    private boolean retryAnotherBrokerWhenNotStoreOK = false;
-    private int maxMessageSize = 1024 * 1024 * 4; // 4M
-    public DefaultMQProducer() {
-        this(MixAll.DEFAULT_PRODUCER_GROUP, null);
-    }
-
-
-    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
-        this.producerGroup = producerGroup;
-        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
-    }
-
-
-    public DefaultMQProducer(final String producerGroup) {
-        this(producerGroup, null);
-    }
-
-
-    public DefaultMQProducer(RPCHook rpcHook) {
-        this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
-    }
-
-
-    @Override
-    public void start() throws MQClientException {
-        this.defaultMQProducerImpl.start();
-    }
-
-    @Override
-    public void shutdown() {
-        this.defaultMQProducerImpl.shutdown();
-    }
-
-
-    @Override
-    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
-        return this.defaultMQProducerImpl.fetchPublishMessageQueues(topic);
-    }
-
-
-    @Override
-    public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg);
-    }
-
-
-    @Override
-    public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg, timeout);
-    }
-
-
-    @Override
-    public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, sendCallback);
-    }
-
-
-    @Override
-    public void send(Message msg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
-    }
-
-
-    @Override
-    public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.sendOneway(msg);
-    }
-
-
-    @Override
-    public SendResult send(Message msg, MessageQueue mq)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg, mq);
-    }
-
-
-    @Override
-    public SendResult send(Message msg, MessageQueue mq, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg, mq, timeout);
-    }
-
-
-    @Override
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, mq, sendCallback);
-    }
-
-
-    @Override
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, mq, sendCallback, timeout);
-    }
-
-
-    @Override
-    public void sendOneway(Message msg, MessageQueue mq) throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.sendOneway(msg, mq);
-    }
-
-
-    @Override
-    public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg, selector, arg);
-    }
-
-
-    @Override
-    public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
-        return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);
-    }
-
-
-    @Override
-    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback);
-    }
-
-
-    @Override
-    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);
-    }
-
-
-    @Override
-    public void sendOneway(Message msg, MessageQueueSelector selector, Object arg)
-            throws MQClientException, RemotingException, InterruptedException {
-        this.defaultMQProducerImpl.sendOneway(msg, selector, arg);
-    }
-
-
-    @Override
-    public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, final Object arg)
-            throws MQClientException {
-        throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
-    }
-
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-
-    @Override
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        this.defaultMQProducerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
-    }
-
-
-    @Override
-    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
-        return this.defaultMQProducerImpl.searchOffset(mq, timestamp);
-    }
-
-
-    @Override
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQProducerImpl.maxOffset(mq);
-    }
-
-
-    @Override
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        return this.defaultMQProducerImpl.minOffset(mq);
-    }
-
-
-    @Override
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
-        return this.defaultMQProducerImpl.earliestMsgStoreTime(mq);
-    }
-
-
-    @Override
-    public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        return this.defaultMQProducerImpl.viewMessage(offsetMsgId);
-    }
-
-
-    @Override
-    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
-        return this.defaultMQProducerImpl.queryMessage(topic, key, maxNum, begin, end);
-    }
-
-
-    @Override
-    public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-        try {
-            MessageId oldMsgId = MessageDecoder.decodeMessageId(msgId);
-            return this.viewMessage(msgId);
-        } catch (Exception e) {
-        }
-        return this.defaultMQProducerImpl.queryMessageByUniqKey(topic, msgId);
-    }
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
-    }
-
-
-    public String getCreateTopicKey() {
-        return createTopicKey;
-    }
-
-
-    public void setCreateTopicKey(String createTopicKey) {
-        this.createTopicKey = createTopicKey;
-    }
-
-
-    public int getSendMsgTimeout() {
-        return sendMsgTimeout;
-    }
-
-
-    public void setSendMsgTimeout(int sendMsgTimeout) {
-        this.sendMsgTimeout = sendMsgTimeout;
-    }
-
-
-    public int getCompressMsgBodyOverHowmuch() {
-        return compressMsgBodyOverHowmuch;
-    }
-
-
-    public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {
-        this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
-    }
-
-
-    public DefaultMQProducerImpl getDefaultMQProducerImpl() {
-        return defaultMQProducerImpl;
-    }
-
-
-    public boolean isRetryAnotherBrokerWhenNotStoreOK() {
-        return retryAnotherBrokerWhenNotStoreOK;
-    }
-
-
-    public void setRetryAnotherBrokerWhenNotStoreOK(boolean retryAnotherBrokerWhenNotStoreOK) {
-        this.retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
-    }
-
-
-    public int getMaxMessageSize() {
-        return maxMessageSize;
-    }
-
-
-    public void setMaxMessageSize(int maxMessageSize) {
-        this.maxMessageSize = maxMessageSize;
-    }
-
-
-    public int getDefaultTopicQueueNums() {
-        return defaultTopicQueueNums;
-    }
-
-
-    public void setDefaultTopicQueueNums(int defaultTopicQueueNums) {
-        this.defaultTopicQueueNums = defaultTopicQueueNums;
-    }
-
-
-    public int getRetryTimesWhenSendFailed() {
-        return retryTimesWhenSendFailed;
-    }
-
-
-    public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
-        this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
-    }
-
-
-    public boolean isSendMessageWithVIPChannel() {
-        return isVipChannelEnabled();
-    }
-
-
-    public void setSendMessageWithVIPChannel(final boolean sendMessageWithVIPChannel) {
-        this.setVipChannelEnabled(sendMessageWithVIPChannel);
-    }
-
-
-    public long[] getNotAvailableDuration() {
-        return this.defaultMQProducerImpl.getNotAvailableDuration();
-    }
-
-    public void setNotAvailableDuration(final long[] notAvailableDuration) {
-        this.defaultMQProducerImpl.setNotAvailableDuration(notAvailableDuration);
-    }
-
-    public long[] getLatencyMax() {
-        return this.defaultMQProducerImpl.getLatencyMax();
-    }
-
-    public void setLatencyMax(final long[] latencyMax) {
-        this.defaultMQProducerImpl.setLatencyMax(latencyMax);
-    }
-
-    public boolean isSendLatencyFaultEnable() {
-        return this.defaultMQProducerImpl.isSendLatencyFaultEnable();
-    }
-
-    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
-        this.defaultMQProducerImpl.setSendLatencyFaultEnable(sendLatencyFaultEnable);
-    }
-
-    public int getRetryTimesWhenSendAsyncFailed() {
-        return retryTimesWhenSendAsyncFailed;
-    }
-
-    public void setRetryTimesWhenSendAsyncFailed(final int retryTimesWhenSendAsyncFailed) {
-        this.retryTimesWhenSendAsyncFailed = retryTimesWhenSendAsyncFailed;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
deleted file mode 100644
index af3723a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionExecuter.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.Message;
-
-
-/**
- * @author shijia.wxr
- */
-public interface LocalTransactionExecuter {
-    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
deleted file mode 100644
index ee2a93a..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/LocalTransactionState.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public enum LocalTransactionState {
-    COMMIT_MESSAGE,
-    ROLLBACK_MESSAGE,
-    UNKNOW,
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
deleted file mode 100644
index e21bc00..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MQProducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.MQAdmin;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MQProducer extends MQAdmin {
-    void start() throws MQClientException;
-
-    void shutdown();
-
-
-    List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;
-
-
-    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
-            InterruptedException;
-
-
-    SendResult send(final Message msg, final long timeout) throws MQClientException,
-            RemotingException, MQBrokerException, InterruptedException;
-
-
-    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
-            RemotingException, InterruptedException;
-
-
-    void send(final Message msg, final SendCallback sendCallback, final long timeout)
-            throws MQClientException, RemotingException, InterruptedException;
-
-
-    void sendOneway(final Message msg) throws MQClientException, RemotingException,
-            InterruptedException;
-
-
-    SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
-            RemotingException, MQBrokerException, InterruptedException;
-
-
-    SendResult send(final Message msg, final MessageQueue mq, final long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
-
-    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
-            throws MQClientException, RemotingException, InterruptedException;
-
-
-    void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException;
-
-
-    void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
-            RemotingException, InterruptedException;
-
-
-    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
-
-
-    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
-                    final long timeout) throws MQClientException, RemotingException, MQBrokerException,
-            InterruptedException;
-
-
-    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
-              final SendCallback sendCallback) throws MQClientException, RemotingException,
-            InterruptedException;
-
-
-    void send(final Message msg, final MessageQueueSelector selector, final Object arg,
-              final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
-            InterruptedException;
-
-
-    void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
-            throws MQClientException, RemotingException, InterruptedException;
-
-
-    TransactionSendResult sendMessageInTransaction(final Message msg,
-                                                   final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
deleted file mode 100644
index 924c145..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/MessageQueueSelector.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface MessageQueueSelector {
-    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
deleted file mode 100644
index 35d1a72..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendCallback.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public interface SendCallback {
-    public void onSuccess(final SendResult sendResult);
-
-
-    public void onException(final Throwable e);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
deleted file mode 100644
index 183accf..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendResult.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.fastjson.JSON;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author shijia.wxr
- */
-public class SendResult {
-    private SendStatus sendStatus;
-    private String msgId;
-    private MessageQueue messageQueue;
-    private long queueOffset;
-    private String transactionId;
-    private String offsetMsgId;
-    private String regionId;
-    private boolean traceOn = true;
-
-    public SendResult() {
-    }
-
-    public SendResult(SendStatus sendStatus, String msgId, String offsetMsgId, MessageQueue messageQueue, long queueOffset) {
-        this.sendStatus = sendStatus;
-        this.msgId = msgId;
-        this.offsetMsgId = offsetMsgId;
-        this.messageQueue = messageQueue;
-        this.queueOffset = queueOffset;
-    }
-
-    public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) {
-        this.sendStatus = sendStatus;
-        this.msgId = msgId;
-        this.messageQueue = messageQueue;
-        this.queueOffset = queueOffset;
-        this.transactionId = transactionId;
-        this.offsetMsgId = offsetMsgId;
-        this.regionId = regionId;
-    }
-
-    public boolean isTraceOn() {
-        return traceOn;
-    }
-
-    public void setTraceOn(final boolean traceOn) {
-        this.traceOn = traceOn;
-    }
-
-    public String getRegionId() {
-        return regionId;
-    }
-
-    public void setRegionId(final String regionId) {
-        this.regionId = regionId;
-    }
-
-    public static String encoderSendResultToJson(final Object obj) {
-        return JSON.toJSONString(obj);
-    }
-
-    public static SendResult decoderSendResultFromJson(String json) {
-        return JSON.parseObject(json, SendResult.class);
-    }
-
-    public String getMsgId() {
-        return msgId;
-    }
-
-
-    public void setMsgId(String msgId) {
-        this.msgId = msgId;
-    }
-
-
-    public SendStatus getSendStatus() {
-        return sendStatus;
-    }
-
-
-    public void setSendStatus(SendStatus sendStatus) {
-        this.sendStatus = sendStatus;
-    }
-
-
-    public MessageQueue getMessageQueue() {
-        return messageQueue;
-    }
-
-
-    public void setMessageQueue(MessageQueue messageQueue) {
-        this.messageQueue = messageQueue;
-    }
-
-
-    public long getQueueOffset() {
-        return queueOffset;
-    }
-
-
-    public void setQueueOffset(long queueOffset) {
-        this.queueOffset = queueOffset;
-    }
-
-
-    public String getTransactionId() {
-        return transactionId;
-    }
-
-
-    public void setTransactionId(String transactionId) {
-        this.transactionId = transactionId;
-    }
-
-    public String getOffsetMsgId() {
-        return offsetMsgId;
-    }
-
-    public void setOffsetMsgId(String offsetMsgId) {
-        this.offsetMsgId = offsetMsgId;
-    }
-
-    @Override
-    public String toString() {
-        return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue
-                + ", queueOffset=" + queueOffset + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
deleted file mode 100644
index 3bc572f..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/SendStatus.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public enum SendStatus {
-    SEND_OK,
-    FLUSH_DISK_TIMEOUT,
-    FLUSH_SLAVE_TIMEOUT,
-    SLAVE_NOT_AVAILABLE,
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
deleted file mode 100644
index 8440537..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionCheckListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-
-
-/**
- * @author shijia.wxr
- */
-public interface TransactionCheckListener {
-    LocalTransactionState checkLocalTransactionState(final MessageExt msg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
deleted file mode 100644
index 08dd4ab..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionMQProducer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.remoting.RPCHook;
-
-
-/**
- * @author shijia.wxr
- */
-public class TransactionMQProducer extends DefaultMQProducer {
-    private TransactionCheckListener transactionCheckListener;
-    private int checkThreadPoolMinSize = 1;
-    private int checkThreadPoolMaxSize = 1;
-    private int checkRequestHoldMax = 2000;
-
-
-    public TransactionMQProducer() {
-    }
-
-
-    public TransactionMQProducer(final String producerGroup) {
-        super(producerGroup);
-    }
-
-    public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
-        super(producerGroup, rpcHook);
-    }
-
-    @Override
-    public void start() throws MQClientException {
-        this.defaultMQProducerImpl.initTransactionEnv();
-        super.start();
-    }
-
-
-    @Override
-    public void shutdown() {
-        super.shutdown();
-        this.defaultMQProducerImpl.destroyTransactionEnv();
-    }
-
-
-    @Override
-    public TransactionSendResult sendMessageInTransaction(final Message msg,
-                                                          final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
-        if (null == this.transactionCheckListener) {
-            throw new MQClientException("localTransactionBranchCheckListener is null", null);
-        }
-
-        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
-    }
-
-
-    public TransactionCheckListener getTransactionCheckListener() {
-        return transactionCheckListener;
-    }
-
-
-    public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
-        this.transactionCheckListener = transactionCheckListener;
-    }
-
-
-    public int getCheckThreadPoolMinSize() {
-        return checkThreadPoolMinSize;
-    }
-
-
-    public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
-        this.checkThreadPoolMinSize = checkThreadPoolMinSize;
-    }
-
-
-    public int getCheckThreadPoolMaxSize() {
-        return checkThreadPoolMaxSize;
-    }
-
-
-    public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
-        this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
-    }
-
-
-    public int getCheckRequestHoldMax() {
-        return checkRequestHoldMax;
-    }
-
-
-    public void setCheckRequestHoldMax(int checkRequestHoldMax) {
-        this.checkRequestHoldMax = checkRequestHoldMax;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
deleted file mode 100644
index e7dcd0e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/TransactionSendResult.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer;
-
-/**
- * @author shijia.wxr
- */
-public class TransactionSendResult extends SendResult {
-    private LocalTransactionState localTransactionState;
-
-
-    public TransactionSendResult() {
-    }
-
-
-    public LocalTransactionState getLocalTransactionState() {
-        return localTransactionState;
-    }
-
-
-    public void setLocalTransactionState(LocalTransactionState localTransactionState) {
-        this.localTransactionState = localTransactionState;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
deleted file mode 100644
index 648356b..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByHash.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByHash implements MessageQueueSelector {
-
-    @Override
-    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-        int value = arg.hashCode();
-        if (value < 0) {
-            value = Math.abs(value);
-        }
-
-        value = value % mqs.size();
-        return mqs.get(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
deleted file mode 100644
index a213391..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
-    private Set<String> consumeridcs;
-
-
-    @Override
-    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-        return null;
-    }
-
-
-    public Set<String> getConsumeridcs() {
-        return consumeridcs;
-    }
-
-
-    public void setConsumeridcs(Set<String> consumeridcs) {
-        this.consumeridcs = consumeridcs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
deleted file mode 100644
index 3f381e4..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.producer.selector;
-
-import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Random;
-
-
-/**
- * @author shijia.wxr
- */
-public class SelectMessageQueueByRandoom implements MessageQueueSelector {
-    private Random random = new Random(System.currentTimeMillis());
-
-
-    @Override
-    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
-        int value = random.nextInt();
-        if (value < 0) {
-            value = Math.abs(value);
-        }
-
-        value = value % mqs.size();
-        return mqs.get(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
deleted file mode 100644
index e07e233..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/stat/ConsumerStatsManager.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.stat;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
-import com.alibaba.rocketmq.common.stats.StatsItemSet;
-import com.alibaba.rocketmq.common.stats.StatsSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-
-public class ConsumerStatsManager {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME);
-
-    private static final String TOPIC_AND_GROUP_CONSUME_OK_TPS = "CONSUME_OK_TPS";
-    private static final String TOPIC_AND_GROUP_CONSUME_FAILED_TPS = "CONSUME_FAILED_TPS";
-    private static final String TOPIC_AND_GROUP_CONSUME_RT = "CONSUME_RT";
-    private static final String TOPIC_AND_GROUP_PULL_TPS = "PULL_TPS";
-    private static final String TOPIC_AND_GROUP_PULL_RT = "PULL_RT";
-
-    private final StatsItemSet topicAndGroupConsumeOKTPS;
-    private final StatsItemSet topicAndGroupConsumeRT;
-    private final StatsItemSet topicAndGroupConsumeFailedTPS;
-    private final StatsItemSet topicAndGroupPullTPS;
-    private final StatsItemSet topicAndGroupPullRT;
-
-
-    public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
-        this.topicAndGroupConsumeOKTPS =
-                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log);
-
-        this.topicAndGroupConsumeRT =
-                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log);
-
-        this.topicAndGroupConsumeFailedTPS =
-                new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log);
-
-        this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log);
-
-        this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log);
-    }
-
-
-    public void start() {
-    }
-
-
-    public void shutdown() {
-    }
-
-
-    public void incPullRT(final String group, final String topic, final long rt) {
-        this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
-    }
-
-
-    public void incPullTPS(final String group, final String topic, final long msgs) {
-        this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1);
-    }
-
-
-    public void incConsumeRT(final String group, final String topic, final long rt) {
-        this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
-    }
-
-
-    public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
-        this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1);
-    }
-
-
-    public void incConsumeFailedTPS(final String group, final String topic, final long msgs) {
-        this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1);
-    }
-
-    public ConsumeStatus consumeStatus(final String group, final String topic) {
-        ConsumeStatus cs = new ConsumeStatus();
-        {
-            StatsSnapshot ss = this.getPullRT(group, topic);
-            if (ss != null) {
-                cs.setPullRT(ss.getAvgpt());
-            }
-        }
-
-        {
-            StatsSnapshot ss = this.getPullTPS(group, topic);
-            if (ss != null) {
-                cs.setPullTPS(ss.getTps());
-            }
-        }
-
-        {
-            StatsSnapshot ss = this.getConsumeRT(group, topic);
-            if (ss != null) {
-                cs.setConsumeRT(ss.getAvgpt());
-            }
-        }
-
-        {
-            StatsSnapshot ss = this.getConsumeOKTPS(group, topic);
-            if (ss != null) {
-                cs.setConsumeOKTPS(ss.getTps());
-            }
-        }
-
-        {
-            StatsSnapshot ss = this.getConsumeFailedTPS(group, topic);
-            if (ss != null) {
-                cs.setConsumeFailedTPS(ss.getTps());
-            }
-        }
-
-        {
-            StatsSnapshot ss = this.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group);
-            if (ss != null) {
-                cs.setConsumeFailedMsgs(ss.getSum());
-            }
-        }
-
-        return cs;
-    }
-
-    private StatsSnapshot getPullRT(final String group, final String topic) {
-        return this.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group);
-    }
-
-    private StatsSnapshot getPullTPS(final String group, final String topic) {
-        return this.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group);
-    }
-
-    private StatsSnapshot getConsumeRT(final String group, final String topic) {
-        StatsSnapshot statsData = this.topicAndGroupConsumeRT.getStatsDataInMinute(topic + "@" + group);
-        if (0 == statsData.getSum()) {
-            statsData = this.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group);
-        }
-
-        return statsData;
-    }
-
-    private StatsSnapshot getConsumeOKTPS(final String group, final String topic) {
-        return this.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group);
-    }
-
-    private StatsSnapshot getConsumeFailedTPS(final String group, final String topic) {
-        return this.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
new file mode 100644
index 0000000..8d15108
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+
+
+/**
+ * Client Common configuration
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public class ClientConfig {
+    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
+    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
+    private String clientIP = RemotingUtil.getLocalAddress();
+    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
+    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+    /**
+     * Pulling topic information interval from the named server
+     */
+    private int pollNameServerInteval = 1000 * 30;
+    /**
+     * Heartbeat interval in microseconds with message broker
+     */
+    private int heartbeatBrokerInterval = 1000 * 30;
+    /**
+     * Offset persistent interval for consumer
+     */
+    private int persistConsumerOffsetInterval = 1000 * 5;
+    private boolean unitMode = false;
+    private String unitName;
+    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+
+    public String buildMQClientId() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(this.getClientIP());
+
+        sb.append("@");
+        sb.append(this.getInstanceName());
+        if (!UtilAll.isBlank(this.unitName)) {
+            sb.append("@");
+            sb.append(this.unitName);
+        }
+
+        return sb.toString();
+    }
+
+    public String getClientIP() {
+        return clientIP;
+    }
+
+    public void setClientIP(String clientIP) {
+        this.clientIP = clientIP;
+    }
+
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    public void changeInstanceNameToPID() {
+        if (this.instanceName.equals("DEFAULT")) {
+            this.instanceName = String.valueOf(UtilAll.getPid());
+        }
+    }
+
+    public void resetClientConfig(final ClientConfig cc) {
+        this.namesrvAddr = cc.namesrvAddr;
+        this.clientIP = cc.clientIP;
+        this.instanceName = cc.instanceName;
+        this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;
+        this.pollNameServerInteval = cc.pollNameServerInteval;
+        this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
+        this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+        this.unitMode = cc.unitMode;
+        this.unitName = cc.unitName;
+        this.vipChannelEnabled = cc.vipChannelEnabled;
+    }
+
+    public ClientConfig cloneClientConfig() {
+        ClientConfig cc = new ClientConfig();
+        cc.namesrvAddr = namesrvAddr;
+        cc.clientIP = clientIP;
+        cc.instanceName = instanceName;
+        cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+        cc.pollNameServerInteval = pollNameServerInteval;
+        cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
+        cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+        cc.unitMode = unitMode;
+        cc.unitName = unitName;
+        cc.vipChannelEnabled = vipChannelEnabled;
+        return cc;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public int getClientCallbackExecutorThreads() {
+        return clientCallbackExecutorThreads;
+    }
+
+
+    public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+        this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+    }
+
+
+    public int getPollNameServerInteval() {
+        return pollNameServerInteval;
+    }
+
+
+    public void setPollNameServerInteval(int pollNameServerInteval) {
+        this.pollNameServerInteval = pollNameServerInteval;
+    }
+
+
+    public int getHeartbeatBrokerInterval() {
+        return heartbeatBrokerInterval;
+    }
+
+
+    public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
+        this.heartbeatBrokerInterval = heartbeatBrokerInterval;
+    }
+
+
+    public int getPersistConsumerOffsetInterval() {
+        return persistConsumerOffsetInterval;
+    }
+
+
+    public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
+        this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+    }
+
+
+    public String getUnitName() {
+        return unitName;
+    }
+
+
+    public void setUnitName(String unitName) {
+        this.unitName = unitName;
+    }
+
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+
+    public void setUnitMode(boolean unitMode) {
+        this.unitMode = unitMode;
+    }
+
+
+    public boolean isVipChannelEnabled() {
+        return vipChannelEnabled;
+    }
+
+
+    public void setVipChannelEnabled(final boolean vipChannelEnabled) {
+        this.vipChannelEnabled = vipChannelEnabled;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+                + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+                + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+                + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+                + vipChannelEnabled + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
new file mode 100644
index 0000000..9e85283
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+
+/**
+ * Base interface for MQ management
+ *
+ * @author shijia.wxr
+ */
+public interface MQAdmin {
+    /**
+     * Creates an topic
+     *
+     * @param key
+     *         accesskey
+     * @param newTopic
+     *         topic name
+     * @param queueNum
+     *         topic's queue number
+     *
+     * @throws MQClientException
+     */
+    void createTopic(final String key, final String newTopic, final int queueNum)
+            throws MQClientException;
+
+
+    /**
+     * Creates an topic
+     *
+     * @param key
+     *         accesskey
+     * @param newTopic
+     *         topic name
+     * @param queueNum
+     *         topic's queue number
+     * @param topicSysFlag
+     *         topic system flag
+     *
+     * @throws MQClientException
+     */
+    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
+            throws MQClientException;
+
+
+    /**
+     * Gets the message queue offset according to some time in milliseconds<br>
+     * be cautious to call because of more IO overhead
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     * @param timestamp
+     *         from when in milliseconds.
+     *
+     * @return offset
+     *
+     * @throws MQClientException
+     */
+    long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
+
+
+    /**
+     * Gets the max offset
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the max offset
+     *
+     * @throws MQClientException
+     */
+    long maxOffset(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Gets the minimum offset
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the minimum offset
+     *
+     * @throws MQClientException
+     */
+    long minOffset(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Gets the earliest stored message time
+     *
+     * @param mq
+     *         Instance of MessageQueue
+     *
+     * @return the time in microseconds
+     *
+     * @throws MQClientException
+     */
+    long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
+
+
+    /**
+     * Query message according tto message id
+     *
+     * @param offsetMsgId
+     *         message id
+     *
+     * @return message
+     *
+     * @throws InterruptedException
+     * @throws MQBrokerException
+     * @throws RemotingException
+     * @throws MQClientException
+     */
+    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
+            InterruptedException, MQClientException;
+
+
+    /**
+     * Query messages
+     *
+     * @param topic
+     *         message topic
+     * @param key
+     *         message key index word
+     * @param maxNum
+     *         max message number
+     * @param begin
+     *         from when
+     * @param end
+     *         to when
+     *
+     * @return Instance of QueryResult
+     *
+     * @throws MQClientException
+     * @throws InterruptedException
+     */
+    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
+                             final long end) throws MQClientException, InterruptedException;
+    
+    /**
+
+     * @param topic
+     * @param msgId
+     * @return The {@code MessageExt} of given msgId
+     * @throws RemotingException
+     * @throws MQBrokerException
+     * @throws InterruptedException
+     * @throws MQClientException
+     */
+    MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;        
+
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
new file mode 100644
index 0000000..41009c5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MQHelper {
+    public static void resetOffsetByTimestamp(
+            final MessageModel messageModel,
+            final String consumerGroup,
+            final String topic,
+            final long timestamp) throws Exception {
+        resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
+    }
+
+    /**
+     * Reset consumer topic offset according to time
+     *
+     * @param messageModel
+     *         which model
+     * @param instanceName
+     *         which instance
+     * @param consumerGroup
+     *         consumer group
+     * @param topic
+     *         topic
+     * @param timestamp
+     *         time
+     *
+     * @throws Exception
+     */
+    public static void resetOffsetByTimestamp(
+            final MessageModel messageModel,
+            final String instanceName,
+            final String consumerGroup,
+            final String topic,
+            final long timestamp) throws Exception {
+        final Logger log = ClientLogger.getLog();
+
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setInstanceName(instanceName);
+        consumer.setMessageModel(messageModel);
+        consumer.start();
+
+        Set<MessageQueue> mqs = null;
+        try {
+            mqs = consumer.fetchSubscribeMessageQueues(topic);
+            if (mqs != null && !mqs.isEmpty()) {
+                TreeSet<MessageQueue> mqsNew = new TreeSet<MessageQueue>(mqs);
+                for (MessageQueue mq : mqsNew) {
+                    long offset = consumer.searchOffset(mq, timestamp);
+                    if (offset >= 0) {
+                        consumer.updateConsumeOffset(mq, offset);
+                        log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
+                                consumerGroup, offset, mq);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.warn("resetOffsetByTimestamp Exception", e);
+            throw e;
+        } finally {
+            if (mqs != null) {
+                consumer.getDefaultMQPullConsumerImpl().getOffsetStore().persistAll(mqs);
+            }
+            consumer.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
new file mode 100644
index 0000000..cdbf1e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class QueryResult {
+    private final long indexLastUpdateTimestamp;
+    private final List<MessageExt> messageList;
+
+
+    public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
+        this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+        this.messageList = messageList;
+    }
+
+
+    public long getIndexLastUpdateTimestamp() {
+        return indexLastUpdateTimestamp;
+    }
+
+
+    public List<MessageExt> getMessageList() {
+        return messageList;
+    }
+
+
+    @Override
+    public String toString() {
+        return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
+                + messageList + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
new file mode 100644
index 0000000..e977d44
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+
+/**
+ * Common Validator
+ *
+ * @author manhong.yqd
+ */
+public class Validators {
+    public static final String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$";
+    public static final Pattern PATTERN = Pattern.compile(VALID_PATTERN_STR);
+    public static final int CHARACTER_MAX_LENGTH = 255;
+
+    /**
+     * @param origin
+     * @param patternStr
+     *
+     * @return The resulting {@code String}
+     */
+    public static String getGroupWithRegularExpression(String origin, String patternStr) {
+        Pattern pattern = Pattern.compile(patternStr);
+        Matcher matcher = pattern.matcher(origin);
+        while (matcher.find()) {
+            return matcher.group(0);
+        }
+        return null;
+    }
+
+    /**
+     * Validate group
+     *
+     * @param group
+     *
+     * @throws MQClientException
+     */
+    public static void checkGroup(String group) throws MQClientException {
+        if (UtilAll.isBlank(group)) {
+            throw new MQClientException("the specified group is blank", null);
+        }
+        if (!regularExpressionMatcher(group, PATTERN)) {
+            throw new MQClientException(String.format(
+                    "the specified group[%s] contains illegal characters, allowing only %s", group,
+                    VALID_PATTERN_STR), null);
+        }
+        if (group.length() > CHARACTER_MAX_LENGTH) {
+            throw new MQClientException("the specified group is longer than group max length 255.", null);
+        }
+    }
+
+    /**
+     * @param origin
+     * @param pattern
+     *
+     * @return <tt>true</tt> if, and only if, the entire origin sequence
+     *          matches this matcher's pattern
+     */
+    public static boolean regularExpressionMatcher(String origin, Pattern pattern) {
+        if (pattern == null) {
+            return true;
+        }
+        Matcher matcher = pattern.matcher(origin);
+        return matcher.matches();
+    }
+
+    /**
+     * Validate message
+     *
+     * @param msg
+     * @param defaultMQProducer
+     *
+     * @throws MQClientException
+     */
+    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
+            throws MQClientException {
+        if (null == msg) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
+        }
+        // topic
+        Validators.checkTopic(msg.getTopic());
+        // body
+        if (null == msg.getBody()) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
+        }
+
+        if (0 == msg.getBody().length) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
+        }
+
+        if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
+            throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
+                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+        }
+    }
+
+    /**
+     * Validate topic
+     *
+     * @param topic
+     *
+     * @throws MQClientException
+     */
+    public static void checkTopic(String topic) throws MQClientException {
+        if (UtilAll.isBlank(topic)) {
+            throw new MQClientException("the specified topic is blank", null);
+        }
+
+        if (!regularExpressionMatcher(topic, PATTERN)) {
+            throw new MQClientException(String.format(
+                    "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+                    VALID_PATTERN_STR), null);
+        }
+
+        if (topic.length() > CHARACTER_MAX_LENGTH) {
+            throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+        }
+
+        //whether the same with system reserved keyword
+        if (topic.equals(MixAll.DEFAULT_TOPIC)) {
+            throw new MQClientException(
+                    String.format("the topic[%s] is conflict with default topic.", topic), null);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
new file mode 100644
index 0000000..cfff17e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.client.admin;
+
+/**
+ * @author shijia.wxr
+ */
+public interface MQAdminExtInner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
new file mode 100644
index 0000000..6184379
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.common;
+
+public class ClientErrorCode {
+    public static final int CONNECT_BROKER_EXCEPTION = 10001;
+    public static final int ACCESS_BROKER_TIMEOUT = 10002;
+    public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
+    public static final int NO_NAME_SERVER_EXCEPTION = 10004;
+    public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
new file mode 100644
index 0000000..360cfdf
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.client.common;
+
+import java.util.Random;
+
+public class ThreadLocalIndex {
+    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
+    private final Random random = new Random();
+    public ThreadLocalIndex(int value) {
+
+    }
+
+    public int getAndIncrement() {
+        Integer index = this.threadLocalIndex.get();
+        if (null == index) {
+            index = Math.abs(random.nextInt());
+            if (index < 0) index = 0;
+            this.threadLocalIndex.set(index);
+        }
+
+        index = Math.abs(index + 1);
+        if (index < 0)
+            index = 0;
+
+        this.threadLocalIndex.set(index);
+        return index;
+    }
+
+    @Override
+    public String toString() {
+        return "ThreadLocalIndex{" +
+                "threadLocalIndex=" + threadLocalIndex.get() +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
new file mode 100644
index 0000000..cb98b62
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer;
+
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+
+/**
+ * Strategy Algorithm for message allocating between consumers
+ *
+ * @author shijia.wxr
+ * @author vongosling
+ */
+public interface AllocateMessageQueueStrategy {
+
+    /**
+     * Allocating by consumer id
+     *
+     * @param consumerGroup
+     *         current consumer group
+     * @param currentCID
+     *         current consumer id
+     * @param mqAll
+     *         message queue set in current topic
+     * @param cidAll
+     *         consumer set in current consumer group
+     *
+     * @return The allocate result of given strategy
+     */
+    List<MessageQueue> allocate(
+            final String consumerGroup,
+            final String currentCID,
+            final List<MessageQueue> mqAll,
+            final List<String> cidAll
+    );
+
+
+    /**
+     * Algorithm name
+     *
+     * @return The strategy name
+     */
+    String getName();
+}