You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:28 UTC

[37/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
deleted file mode 100644
index 9df2be3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.consumer.store;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.FindBrokerResult;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import org.slf4j.Logger;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-/**
- * Remote storage implementation
- *
- * @author shijia.wxr
- */
-public class RemoteBrokerOffsetStore implements OffsetStore {
-    private final static Logger log = ClientLogger.getLog();
-    private final MQClientInstance mQClientFactory;
-    private final String groupName;
-    private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable =
-            new ConcurrentHashMap<MessageQueue, AtomicLong>();
-
-
-    public RemoteBrokerOffsetStore(MQClientInstance mQClientFactory, String groupName) {
-        this.mQClientFactory = mQClientFactory;
-        this.groupName = groupName;
-    }
-
-
-    @Override
-    public void load() {
-    }
-
-
-    @Override
-    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
-        if (mq != null) {
-            AtomicLong offsetOld = this.offsetTable.get(mq);
-            if (null == offsetOld) {
-                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
-            }
-
-            if (null != offsetOld) {
-                if (increaseOnly) {
-                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
-                } else {
-                    offsetOld.set(offset);
-                }
-            }
-        }
-    }
-
-
-    @Override
-    public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
-        if (mq != null) {
-            switch (type) {
-                case MEMORY_FIRST_THEN_STORE:
-                case READ_FROM_MEMORY: {
-                    AtomicLong offset = this.offsetTable.get(mq);
-                    if (offset != null) {
-                        return offset.get();
-                    } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
-                        return -1;
-                    }
-                }
-                case READ_FROM_STORE: {
-                    try {
-                        long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
-                        AtomicLong offset = new AtomicLong(brokerOffset);
-                        this.updateOffset(mq, offset.get(), false);
-                        return brokerOffset;
-                    }
-                    // No offset in broker
-                    catch (MQBrokerException e) {
-                        return -1;
-                    }
-                    //Other exceptions
-                    catch (Exception e) {
-                        log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
-                        return -2;
-                    }
-                }
-                default:
-                    break;
-            }
-        }
-
-        return -1;
-    }
-
-
-    @Override
-    public void persistAll(Set<MessageQueue> mqs) {
-        if (null == mqs || mqs.isEmpty())
-            return;
-
-        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
-        if (mqs != null && !mqs.isEmpty()) {
-            for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
-                MessageQueue mq = entry.getKey();
-                AtomicLong offset = entry.getValue();
-                if (offset != null) {
-                    if (mqs.contains(mq)) {
-                        try {
-                            this.updateConsumeOffsetToBroker(mq, offset.get());
-                            log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
-                                    this.groupName,
-                                    this.mQClientFactory.getClientId(),
-                                    mq,
-                                    offset.get());
-                        } catch (Exception e) {
-                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
-                        }
-                    } else {
-                        unusedMQ.add(mq);
-                    }
-                }
-            }
-        }
-
-        if (!unusedMQ.isEmpty()) {
-            for (MessageQueue mq : unusedMQ) {
-                this.offsetTable.remove(mq);
-                log.info("remove unused mq, {}, {}", mq, this.groupName);
-            }
-        }
-    }
-
-
-    @Override
-    public void persist(MessageQueue mq) {
-        AtomicLong offset = this.offsetTable.get(mq);
-        if (offset != null) {
-            try {
-                this.updateConsumeOffsetToBroker(mq, offset.get());
-                log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
-                        this.groupName,
-                        this.mQClientFactory.getClientId(),
-                        mq,
-                        offset.get());
-            } catch (Exception e) {
-                log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
-            }
-        }
-    }
-
-    public void removeOffset(MessageQueue mq) {
-        if (mq != null) {
-            this.offsetTable.remove(mq);
-            log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
-                    offsetTable.size());
-        }
-    }
-
-    @Override
-    public Map<MessageQueue, Long> cloneOffsetTable(String topic) {
-        Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();
-        for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
-            MessageQueue mq = entry.getKey();
-            if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {
-                continue;
-            }
-            cloneOffsetTable.put(mq, entry.getValue().get());
-        }
-        return cloneOffsetTable;
-    }
-
-    /**
-     * Update the Consumer Offset in one way, once the Master is off, updated to Slave,
-     * here need to be optimized.
-     */
-    private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
-            MQBrokerException, InterruptedException, MQClientException {
-        updateConsumeOffsetToBroker(mq, offset, true);
-    }
-
-    /**
-     * Update the Consumer Offset synchronously, once the Master is off, updated to Slave,
-     * here need to be optimized.
-     */
-    @Override
-    public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
-            MQBrokerException, InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
-        if (null == findBrokerResult) {
-            // TODO Here may be heavily overhead for Name Server,need tuning
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
-        }
-
-        if (findBrokerResult != null) {
-            UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
-            requestHeader.setTopic(mq.getTopic());
-            requestHeader.setConsumerGroup(this.groupName);
-            requestHeader.setQueueId(mq.getQueueId());
-            requestHeader.setCommitOffset(offset);
-
-            if (isOneway) {
-                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
-                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
-            } else {
-                this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
-                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
-            }
-        } else {
-            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-        }
-    }
-
-    private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
-            InterruptedException, MQClientException {
-        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
-        if (null == findBrokerResult) {
-            // TODO Here may be heavily overhead for Name Server,need tuning
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
-        }
-
-        if (findBrokerResult != null) {
-            QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
-            requestHeader.setTopic(mq.getTopic());
-            requestHeader.setConsumerGroup(this.groupName);
-            requestHeader.setQueueId(mq.getQueueId());
-
-            return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
-                    findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
-        } else {
-            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
deleted file mode 100644
index 7fc09f8..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQBrokerException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.exception;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQBrokerException extends Exception {
-    private static final long serialVersionUID = 5975020272601250368L;
-    private final int responseCode;
-    private final String errorMessage;
-
-
-    public MQBrokerException(int responseCode, String errorMessage) {
-        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
-                + errorMessage));
-        this.responseCode = responseCode;
-        this.errorMessage = errorMessage;
-    }
-
-
-    public int getResponseCode() {
-        return responseCode;
-    }
-
-
-    public String getErrorMessage() {
-        return errorMessage;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java b/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
deleted file mode 100644
index f343a67..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/exception/MQClientException.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.exception;
-
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQClientException extends Exception {
-    private static final long serialVersionUID = -5758410930844185841L;
-    private int responseCode;
-    private String errorMessage;
-
-
-    public MQClientException(String errorMessage, Throwable cause) {
-        super(FAQUrl.attachDefaultURL(errorMessage), cause);
-        this.responseCode = -1;
-        this.errorMessage = errorMessage;
-    }
-
-
-    public MQClientException(int responseCode, String errorMessage) {
-        super(FAQUrl.attachDefaultURL("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
-                + errorMessage));
-        this.responseCode = responseCode;
-        this.errorMessage = errorMessage;
-    }
-
-    public int getResponseCode() {
-        return responseCode;
-    }
-
-    public MQClientException setResponseCode(final int responseCode) {
-        this.responseCode = responseCode;
-        return this;
-    }
-
-    public String getErrorMessage() {
-        return errorMessage;
-    }
-
-    public void setErrorMessage(final String errorMessage) {
-        this.errorMessage = errorMessage;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
deleted file mode 100644
index 6291803..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenContext.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-
-/**
- * @author manhong.yqd
- */
-public class CheckForbiddenContext {
-    private String nameSrvAddr;
-    private String group;
-    private Message message;
-    private MessageQueue mq;
-    private String brokerAddr;
-    private CommunicationMode communicationMode;
-    private SendResult sendResult;
-    private Exception exception;
-    private Object arg;
-    private boolean unitMode = false;
-
-
-    public String getGroup() {
-        return group;
-    }
-
-
-    public void setGroup(String group) {
-        this.group = group;
-    }
-
-
-    public Message getMessage() {
-        return message;
-    }
-
-
-    public void setMessage(Message message) {
-        this.message = message;
-    }
-
-
-    public MessageQueue getMq() {
-        return mq;
-    }
-
-
-    public void setMq(MessageQueue mq) {
-        this.mq = mq;
-    }
-
-
-    public String getBrokerAddr() {
-        return brokerAddr;
-    }
-
-
-    public void setBrokerAddr(String brokerAddr) {
-        this.brokerAddr = brokerAddr;
-    }
-
-
-    public CommunicationMode getCommunicationMode() {
-        return communicationMode;
-    }
-
-
-    public void setCommunicationMode(CommunicationMode communicationMode) {
-        this.communicationMode = communicationMode;
-    }
-
-
-    public SendResult getSendResult() {
-        return sendResult;
-    }
-
-
-    public void setSendResult(SendResult sendResult) {
-        this.sendResult = sendResult;
-    }
-
-
-    public Exception getException() {
-        return exception;
-    }
-
-
-    public void setException(Exception exception) {
-        this.exception = exception;
-    }
-
-
-    public Object getArg() {
-        return arg;
-    }
-
-
-    public void setArg(Object arg) {
-        this.arg = arg;
-    }
-
-
-    public boolean isUnitMode() {
-        return unitMode;
-    }
-
-
-    public void setUnitMode(boolean isUnitMode) {
-        this.unitMode = isUnitMode;
-    }
-
-
-    public String getNameSrvAddr() {
-        return nameSrvAddr;
-    }
-
-
-    public void setNameSrvAddr(String nameSrvAddr) {
-        this.nameSrvAddr = nameSrvAddr;
-    }
-
-
-    @Override
-    public String toString() {
-        return "SendMessageContext [nameSrvAddr=" + nameSrvAddr + ", group=" + group + ", message=" + message
-                + ", mq=" + mq + ", brokerAddr=" + brokerAddr + ", communicationMode=" + communicationMode
-                + ", sendResult=" + sendResult + ", exception=" + exception + ", unitMode=" + unitMode
-                + ", arg=" + arg + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
deleted file mode 100644
index 35a2740..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/CheckForbiddenHook.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.exception.MQClientException;
-
-
-/**
- * @author manhong.yqd
- */
-public interface CheckForbiddenHook {
-    public String hookName();
-
-
-    public void checkForbidden(final CheckForbiddenContext context) throws MQClientException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
deleted file mode 100644
index 0c0e7cd..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-import java.util.Map;
-
-
-public class ConsumeMessageContext {
-    private String consumerGroup;
-    private List<MessageExt> msgList;
-    private MessageQueue mq;
-    private boolean success;
-    private String status;
-    private Object mqTraceContext;
-    private Map<String, String> props;
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public List<MessageExt> getMsgList() {
-        return msgList;
-    }
-
-
-    public void setMsgList(List<MessageExt> msgList) {
-        this.msgList = msgList;
-    }
-
-
-    public MessageQueue getMq() {
-        return mq;
-    }
-
-
-    public void setMq(MessageQueue mq) {
-        this.mq = mq;
-    }
-
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-
-    public Object getMqTraceContext() {
-        return mqTraceContext;
-    }
-
-
-    public void setMqTraceContext(Object mqTraceContext) {
-        this.mqTraceContext = mqTraceContext;
-    }
-
-
-    public Map<String, String> getProps() {
-        return props;
-    }
-
-
-    public void setProps(Map<String, String> props) {
-        this.props = props;
-    }
-
-
-    public String getStatus() {
-        return status;
-    }
-
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
deleted file mode 100644
index 96b0e53..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/ConsumeMessageHook.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-public interface ConsumeMessageHook {
-    String hookName();
-
-    void consumeMessageBefore(final ConsumeMessageContext context);
-
-    void consumeMessageAfter(final ConsumeMessageContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
deleted file mode 100644
index c47f09e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageContext.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-
-import java.util.List;
-
-
-/**
- * @author manhong.yqd
- */
-public class FilterMessageContext {
-    private String consumerGroup;
-    private List<MessageExt> msgList;
-    private MessageQueue mq;
-    private Object arg;
-    private boolean unitMode;
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public List<MessageExt> getMsgList() {
-        return msgList;
-    }
-
-
-    public void setMsgList(List<MessageExt> msgList) {
-        this.msgList = msgList;
-    }
-
-
-    public MessageQueue getMq() {
-        return mq;
-    }
-
-
-    public void setMq(MessageQueue mq) {
-        this.mq = mq;
-    }
-
-
-    public Object getArg() {
-        return arg;
-    }
-
-
-    public void setArg(Object arg) {
-        this.arg = arg;
-    }
-
-
-    public boolean isUnitMode() {
-        return unitMode;
-    }
-
-
-    public void setUnitMode(boolean isUnitMode) {
-        this.unitMode = isUnitMode;
-    }
-
-
-    @Override
-    public String toString() {
-        return "ConsumeMessageContext [consumerGroup=" + consumerGroup + ", msgList=" + msgList + ", mq="
-                + mq + ", arg=" + arg + "]";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
deleted file mode 100644
index 1528ef9..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/FilterMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-/**
- * @author manhong.yqd
- */
-public interface FilterMessageHook {
-    public String hookName();
-
-
-    public void filterMessage(final FilterMessageContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
deleted file mode 100644
index 9552456..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageContext.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.message.MessageType;
-
-import java.util.Map;
-
-
-public class SendMessageContext {
-    private String producerGroup;
-    private Message message;
-    private MessageQueue mq;
-    private String brokerAddr;
-    private String bornHost;
-    private CommunicationMode communicationMode;
-    private SendResult sendResult;
-    private Exception exception;
-    private Object mqTraceContext;
-    private Map<String, String> props;
-    private DefaultMQProducerImpl producer;
-    private MessageType msgType = MessageType.Normal_Msg;
-
-    public MessageType getMsgType() {
-        return msgType;
-    }
-
-    public void setMsgType(final MessageType msgType) {
-        this.msgType = msgType;
-    }
-
-    public DefaultMQProducerImpl getProducer() {
-        return producer;
-    }
-
-    public void setProducer(final DefaultMQProducerImpl producer) {
-        this.producer = producer;
-    }
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
-    }
-
-
-    public Message getMessage() {
-        return message;
-    }
-
-
-    public void setMessage(Message message) {
-        this.message = message;
-    }
-
-
-    public MessageQueue getMq() {
-        return mq;
-    }
-
-
-    public void setMq(MessageQueue mq) {
-        this.mq = mq;
-    }
-
-
-    public String getBrokerAddr() {
-        return brokerAddr;
-    }
-
-
-    public void setBrokerAddr(String brokerAddr) {
-        this.brokerAddr = brokerAddr;
-    }
-
-
-    public CommunicationMode getCommunicationMode() {
-        return communicationMode;
-    }
-
-
-    public void setCommunicationMode(CommunicationMode communicationMode) {
-        this.communicationMode = communicationMode;
-    }
-
-
-    public SendResult getSendResult() {
-        return sendResult;
-    }
-
-
-    public void setSendResult(SendResult sendResult) {
-        this.sendResult = sendResult;
-    }
-
-
-    public Exception getException() {
-        return exception;
-    }
-
-
-    public void setException(Exception exception) {
-        this.exception = exception;
-    }
-
-
-    public Object getMqTraceContext() {
-        return mqTraceContext;
-    }
-
-
-    public void setMqTraceContext(Object mqTraceContext) {
-        this.mqTraceContext = mqTraceContext;
-    }
-
-
-    public Map<String, String> getProps() {
-        return props;
-    }
-
-
-    public void setProps(Map<String, String> props) {
-        this.props = props;
-    }
-
-
-    public String getBornHost() {
-        return bornHost;
-    }
-
-
-    public void setBornHost(String bornHost) {
-        this.bornHost = bornHost;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java b/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
deleted file mode 100644
index 22e1fb3..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/hook/SendMessageHook.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.hook;
-
-public interface SendMessageHook {
-    String hookName();
-
-    void sendMessageBefore(final SendMessageContext context);
-
-    void sendMessageAfter(final SendMessageContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
deleted file mode 100644
index 79a539e..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/ClientRemotingProcessor.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.MQProducerInner;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
-import com.alibaba.rocketmq.common.protocol.header.*;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientRemotingProcessor implements NettyRequestProcessor {
-    private final Logger log = ClientLogger.getLog();
-    private final MQClientInstance mqClientFactory;
-
-
-    public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
-        this.mqClientFactory = mqClientFactory;
-    }
-
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        switch (request.getCode()) {
-            case RequestCode.CHECK_TRANSACTION_STATE:
-                return this.checkTransactionState(ctx, request);
-            case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
-                return this.notifyConsumerIdsChanged(ctx, request);
-            case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
-                return this.resetOffset(ctx, request);
-            case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
-                return this.getConsumeStatus(ctx, request);
-
-            case RequestCode.GET_CONSUMER_RUNNING_INFO:
-                return this.getConsumerRunningInfo(ctx, request);
-
-            case RequestCode.CONSUME_MESSAGE_DIRECTLY:
-                return this.consumeMessageDirectly(ctx, request);
-            default:
-                break;
-        }
-        return null;
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-
-    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final CheckTransactionStateRequestHeader requestHeader =
-                (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
-        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
-        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
-        if (messageExt != null) {
-            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
-            if (group != null) {
-                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
-                if (producer != null) {
-                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                    producer.checkTransactionState(addr, messageExt, requestHeader);
-                } else {
-                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
-                }
-            } else {
-                log.warn("checkTransactionState, pick producer group failed");
-            }
-        } else {
-            log.warn("checkTransactionState, decode message failed");
-        }
-
-        return null;
-    }
-
-    public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        try {
-            final NotifyConsumerIdsChangedRequestHeader requestHeader =
-                    (NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
-            log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                    requestHeader.getConsumerGroup());
-            this.mqClientFactory.rebalanceImmediately();
-        } catch (Exception e) {
-            log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));
-        }
-        return null;
-    }
-
-    public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final ResetOffsetRequestHeader requestHeader =
-                (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
-        log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",
-                new Object[]{RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
-                        requestHeader.getTimestamp()});
-        Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
-        if (request.getBody() != null) {
-            ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);
-            offsetTable = body.getOffsetTable();
-        }
-        this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);
-        return null;
-    }
-
-    @Deprecated
-    public RemotingCommand getConsumeStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final GetConsumerStatusRequestHeader requestHeader =
-                (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
-
-        Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());
-        GetConsumerStatusBody body = new GetConsumerStatusBody();
-        body.setMessageQueueTable(offsetTable);
-        response.setBody(body.encode());
-        response.setCode(ResponseCode.SUCCESS);
-        return response;
-    }
-
-    private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final GetConsumerRunningInfoRequestHeader requestHeader =
-                (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
-
-        ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
-        if (null != consumerRunningInfo) {
-            if (requestHeader.isJstackEnable()) {
-                Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
-                String jstack = UtilAll.jstack(map);
-                consumerRunningInfo.setJstack(jstack);
-            }
-
-            response.setCode(ResponseCode.SUCCESS);
-            response.setBody(consumerRunningInfo.encode());
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
-        }
-
-        return response;
-    }
-
-    private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final ConsumeMessageDirectlyResultRequestHeader requestHeader =
-                (ConsumeMessageDirectlyResultRequestHeader) request
-                        .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
-
-        final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));
-
-        ConsumeMessageDirectlyResult result =
-                this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());
-
-        if (null != result) {
-            response.setCode(ResponseCode.SUCCESS);
-            response.setBody(result.encode());
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));
-        }
-
-        return response;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
deleted file mode 100644
index bc2f95d..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/CommunicationMode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-/**
- * @author shijia.wxr
- */
-public enum CommunicationMode {
-    SYNC,
-    ASYNC,
-    ONEWAY,
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
deleted file mode 100644
index 22805cd..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/FindBrokerResult.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-/**
- * @author shijia.wxr
- */
-public class FindBrokerResult {
-    private final String brokerAddr;
-    private final boolean slave;
-
-
-    public FindBrokerResult(String brokerAddr, boolean slave) {
-        this.brokerAddr = brokerAddr;
-        this.slave = slave;
-    }
-
-
-    public String getBrokerAddr() {
-        return brokerAddr;
-    }
-
-
-    public boolean isSlave() {
-        return slave;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
deleted file mode 100644
index 9f7e964..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQAdminImpl.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.QueryResult;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.route.BrokerData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.remoting.InvokeCallback;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.exception.RemotingException;
-import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQAdminImpl {
-
-    private final Logger log = ClientLogger.getLog();
-    private final MQClientInstance mQClientFactory;
-    private long timeoutMillis = 6000;
-
-
-    public MQAdminImpl(MQClientInstance mQClientFactory) {
-        this.mQClientFactory = mQClientFactory;
-    }
-
-
-    public long getTimeoutMillis() {
-        return timeoutMillis;
-    }
-
-
-    public void setTimeoutMillis(long timeoutMillis) {
-        this.timeoutMillis = timeoutMillis;
-    }
-
-
-    public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
-        createTopic(key, newTopic, queueNum, 0);
-    }
-
-
-    public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
-        try {
-            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
-            List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
-            if (brokerDataList != null && !brokerDataList.isEmpty()) {
-                Collections.sort(brokerDataList);
-
-                boolean createOKAtLeastOnce = false;
-                MQClientException exception = null;
-
-                StringBuilder orderTopicString = new StringBuilder();
-
-                for (BrokerData brokerData : brokerDataList) {
-                    String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
-                    if (addr != null) {
-                        TopicConfig topicConfig = new TopicConfig(newTopic);
-                        topicConfig.setReadQueueNums(queueNum);
-                        topicConfig.setWriteQueueNums(queueNum);
-                        topicConfig.setTopicSysFlag(topicSysFlag);
-
-                        boolean createOK = false;
-                        for (int i = 0; i < 5; i++) {
-                            try {
-                                this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);
-                                createOK = true;
-                                createOKAtLeastOnce = true;
-                                break;
-                            } catch (Exception e) {
-                                if (4 == i) {
-                                    exception = new MQClientException("create topic to broker exception", e);
-                                }
-                            }
-                        }
-
-                        if (createOK) {
-                            orderTopicString.append(brokerData.getBrokerName());
-                            orderTopicString.append(":");
-                            orderTopicString.append(queueNum);
-                            orderTopicString.append(";");
-                        }
-                    }
-                }
-
-                if (exception != null && !createOKAtLeastOnce) {
-                    throw exception;
-                }
-            } else {
-                throw new MQClientException("Not found broker, maybe key is wrong", null);
-            }
-        } catch (Exception e) {
-            throw new MQClientException("create new topic failed", e);
-        }
-    }
-
-
-    public List<MessageQueue> fetchPublishMessageQueues(String topic) throws MQClientException {
-        try {
-            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
-            if (topicRouteData != null) {
-                TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
-                if (topicPublishInfo != null && topicPublishInfo.ok()) {
-                    return topicPublishInfo.getMessageQueueList();
-                }
-            }
-        } catch (Exception e) {
-            throw new MQClientException("Can not find Message Queue for this topic, " + topic, e);
-        }
-
-        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
-    }
-
-
-    public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
-        try {
-            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
-            if (topicRouteData != null) {
-                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
-                if (!mqList.isEmpty()) {
-                    return mqList;
-                } else {
-                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
-                }
-            }
-        } catch (Exception e) {
-            throw new MQClientException(
-                    "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
-                    e);
-        }
-
-        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
-    }
-
-
-    public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
-
-        if (brokerAddr != null) {
-            try {
-                return this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timestamp,
-                        timeoutMillis);
-            } catch (Exception e) {
-                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
-            }
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-
-    public long maxOffset(MessageQueue mq) throws MQClientException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
-
-        if (brokerAddr != null) {
-            try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMaxOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
-            } catch (Exception e) {
-                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
-            }
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-
-    public long minOffset(MessageQueue mq) throws MQClientException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
-
-        if (brokerAddr != null) {
-            try {
-                return this.mQClientFactory.getMQClientAPIImpl().getMinOffset(brokerAddr, mq.getTopic(), mq.getQueueId(), timeoutMillis);
-            } catch (Exception e) {
-                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
-            }
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-
-    public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
-        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        if (null == brokerAddr) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
-            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
-        }
-
-        if (brokerAddr != null) {
-            try {
-                return this.mQClientFactory.getMQClientAPIImpl().getEarliestMsgStoretime(brokerAddr, mq.getTopic(), mq.getQueueId(),
-                        timeoutMillis);
-            } catch (Exception e) {
-                throw new MQClientException("Invoke Broker[" + brokerAddr + "] exception", e);
-            }
-        }
-
-        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
-    }
-
-    public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
-
-        MessageId messageId = null;
-        try {
-            messageId = MessageDecoder.decodeMessageId(msgId);
-        } catch (Exception e) {
-            throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by id finished, but no message.");
-        }
-        return this.mQClientFactory.getMQClientAPIImpl().viewMessage(RemotingUtil.socketAddress2String(messageId.getAddress()),
-                messageId.getOffset(), timeoutMillis);
-    }
-
-    public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException,
-            InterruptedException {
-        return queryMessage(topic, key, maxNum, begin, end, false);
-    }
-
-    public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws InterruptedException, MQClientException {
-
-        QueryResult qr = this.queryMessage(topic, uniqKey, 32,
-                MessageClientIDSetter.getNearlyTimeFromID(uniqKey).getTime() - 1000, Long.MAX_VALUE, true);
-        if (qr != null && qr.getMessageList() != null && qr.getMessageList().size() > 0) {
-            return qr.getMessageList().get(0);
-        } else {
-            return null;
-        }
-    }
-
-    protected QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end, boolean isUniqKey) throws MQClientException,
-            InterruptedException {
-        TopicRouteData topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
-        if (null == topicRouteData) {
-            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
-            topicRouteData = this.mQClientFactory.getAnExistTopicRouteData(topic);
-        }
-
-        if (topicRouteData != null) {
-            List<String> brokerAddrs = new LinkedList<String>();
-            for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
-                String addr = brokerData.selectBrokerAddr();
-                if (addr != null) {
-                    brokerAddrs.add(addr);
-                }
-            }
-
-            if (!brokerAddrs.isEmpty()) {
-                final CountDownLatch countDownLatch = new CountDownLatch(brokerAddrs.size());
-                final List<QueryResult> queryResultList = new LinkedList<QueryResult>();
-
-                for (String addr : brokerAddrs) {
-                    try {
-                        QueryMessageRequestHeader requestHeader = new QueryMessageRequestHeader();
-                        requestHeader.setTopic(topic);
-                        requestHeader.setKey(key);
-                        requestHeader.setMaxNum(maxNum);
-                        requestHeader.setBeginTimestamp(begin);
-                        requestHeader.setEndTimestamp(end);
-
-                        this.mQClientFactory.getMQClientAPIImpl().queryMessage(addr, requestHeader, timeoutMillis * 3,
-                                new InvokeCallback() {
-                                    @Override
-                                    public void operationComplete(ResponseFuture responseFuture) {
-                                        try {
-                                            RemotingCommand response = responseFuture.getResponseCommand();
-                                            if (response != null) {
-                                                switch (response.getCode()) {
-                                                    case ResponseCode.SUCCESS: {
-                                                        QueryMessageResponseHeader responseHeader = null;
-                                                        try {
-                                                            responseHeader =
-                                                                    (QueryMessageResponseHeader) response
-                                                                            .decodeCommandCustomHeader(QueryMessageResponseHeader.class);
-                                                        } catch (RemotingCommandException e) {
-                                                            log.error("decodeCommandCustomHeader exception", e);
-                                                            return;
-                                                        }
-
-                                                        List<MessageExt> wrappers =
-                                                                MessageDecoder.decodes(ByteBuffer.wrap(response.getBody()), true);
-
-                                                        QueryResult qr = new QueryResult(responseHeader.getIndexLastUpdateTimestamp(), wrappers);
-                                                        queryResultList.add(qr);
-                                                        break;
-                                                    }
-                                                    default:
-                                                        log.warn("getResponseCommand failed, {} {}", response.getCode(), response.getRemark());
-                                                        break;
-                                                }
-                                            } else {
-                                                log.warn("getResponseCommand return null");
-                                            }
-                                        } finally {
-                                            countDownLatch.countDown();
-                                        }
-                                    }
-                                }, isUniqKey);
-                    } catch (Exception e) {
-                        log.warn("queryMessage exception", e);
-                    }
-
-                }
-
-                boolean ok = countDownLatch.await(timeoutMillis * 4, TimeUnit.MILLISECONDS);
-                if (!ok) {
-                    log.warn("queryMessage, maybe some broker failed");
-                }
-
-                long indexLastUpdateTimestamp = 0;
-                List<MessageExt> messageList = new LinkedList<MessageExt>();
-                for (QueryResult qr : queryResultList) {
-                    if (qr.getIndexLastUpdateTimestamp() > indexLastUpdateTimestamp) {
-                        indexLastUpdateTimestamp = qr.getIndexLastUpdateTimestamp();
-                    }
-
-                    for (MessageExt msgExt : qr.getMessageList()) {
-                        if (isUniqKey) {
-                            if (msgExt.getMsgId().equals(key)) {
-
-                                if (messageList.size() > 0) {
-
-                                    if (messageList.get(0).getStoreTimestamp() > msgExt.getStoreTimestamp()) {
-
-                                        messageList.clear();
-                                        messageList.add(msgExt);
-                                    }
-
-                                } else {
-
-                                    messageList.add(msgExt);
-                                }
-                            } else {
-                                log.warn("queryMessage by uniqKey, find message key not matched, maybe hash duplicate {}", msgExt.toString());
-                            }
-                        } else {
-                            String keys = msgExt.getKeys();
-                            if (keys != null) {
-                                boolean matched = false;
-                                String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
-                                if (keyArray != null) {
-                                    for (String k : keyArray) {
-                                        if (key.equals(k)) {
-                                            matched = true;
-                                            break;
-                                        }
-                                    }
-                                }
-
-                                if (matched) {
-                                    messageList.add(msgExt);
-                                } else {
-                                    log.warn("queryMessage, find message key not matched, maybe hash duplicate {}", msgExt.toString());
-                                }
-                            }
-                        }
-                    }
-                }
-
-                if (!messageList.isEmpty()) {
-                    return new QueryResult(indexLastUpdateTimestamp, messageList);
-                } else {
-                    throw new MQClientException(ResponseCode.NO_MESSAGE, "query message by key finished, but no message.");
-                }
-            }
-        }
-
-        throw new MQClientException(ResponseCode.TOPIC_NOT_EXIST, "The topic[" + topic + "] not matched route info");
-    }
-}