You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:47 UTC

[13/17] incubator-rocketmq-externals git commit: Polish cpp module

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PermName.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.cpp b/rocketmq-client4cpp/src/common/PermName.cpp
deleted file mode 100644
index 084de79..0000000
--- a/rocketmq-client4cpp/src/common/PermName.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PermName.h"
-
-namespace rmq
-{
-
-int PermName::PERM_PRIORITY = 0x1 << 3;
-int PermName::PERM_READ = 0x1 << 2;
-int PermName::PERM_WRITE = 0x1 << 1;
-int PermName::PERM_INHERIT = 0x1 << 0;
-
-bool PermName::isReadable(int perm)
-{
-    return (perm & PERM_READ) == PERM_READ;
-}
-
-bool PermName::isWriteable(int perm)
-{
-    return (perm & PERM_WRITE) == PERM_WRITE;
-}
-
-bool PermName::isInherited(int perm)
-{
-    return (perm & PERM_INHERIT) == PERM_INHERIT;
-}
-
-std::string PermName::perm2String(int perm)
-{
-    std::string pm("---");
-    if (isReadable(perm))
-    {
-        pm.replace(0, 1, "R");
-    }
-
-    if (isWriteable(perm))
-    {
-        pm.replace(1, 2, "W");
-    }
-
-    if (isInherited(perm))
-    {
-        pm.replace(2, 3, "X");
-    }
-
-    return pm;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PermName.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.h b/rocketmq-client4cpp/src/common/PermName.h
deleted file mode 100644
index 364ddeb..0000000
--- a/rocketmq-client4cpp/src/common/PermName.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PERMNAME_H__
-#define __PERMNAME_H__
-
-#include <string>
-
-namespace rmq
-{
-	class PermName
-	{
-	public:
-	    static int PERM_PRIORITY;
-	    static int PERM_READ;
-	    static int PERM_WRITE;
-	    static int PERM_INHERIT;
-
-	    static bool isReadable(int perm);
-	    static bool isWriteable(int perm);
-	    static bool isInherited(int perm);
-	    static std::string perm2String(int perm);
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PullSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.cpp b/rocketmq-client4cpp/src/common/PullSysFlag.cpp
deleted file mode 100644
index f6fc1c2..0000000
--- a/rocketmq-client4cpp/src/common/PullSysFlag.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullSysFlag.h"
-
-namespace rmq
-{
-
-int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0;
-int PullSysFlag::FLAG_SUSPEND = 0x1 << 1;
-int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2;
-
-int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription)
-{
-    int flag = 0;
-
-    if (commitOffset)
-    {
-        flag |= FLAG_COMMIT_OFFSET;
-    }
-
-    if (suspend)
-    {
-        flag |= FLAG_SUSPEND;
-    }
-
-    if (subscription)
-    {
-        flag |= FLAG_SUBSCRIPTION;
-    }
-
-    return flag;
-}
-
-int PullSysFlag::clearCommitOffsetFlag(int sysFlag)
-{
-    return sysFlag & (~FLAG_COMMIT_OFFSET);
-}
-
-bool PullSysFlag::hasCommitOffsetFlag(int sysFlag)
-{
-    return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
-}
-
-bool PullSysFlag::hasSuspendFlag(int sysFlag)
-{
-    return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
-}
-
-bool PullSysFlag::hasSubscriptionFlag(int sysFlag)
-{
-    return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PullSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.h b/rocketmq-client4cpp/src/common/PullSysFlag.h
deleted file mode 100755
index c19eac3..0000000
--- a/rocketmq-client4cpp/src/common/PullSysFlag.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLSYSFLAG_H__
-#define __PULLSYSFLAG_H__
-
-namespace rmq
-{
-	class PullSysFlag
-	{
-	public:
-	    static int buildSysFlag(bool commitOffset, bool suspend, bool subscription);
-	    static int clearCommitOffsetFlag(int sysFlag);
-	    static bool hasCommitOffsetFlag(int sysFlag);
-	    static bool hasSuspendFlag(int sysFlag);
-	    static bool hasSubscriptionFlag(int sysFlag);
-
-	private:
-	    static int FLAG_COMMIT_OFFSET;
-	    static int FLAG_SUSPEND;
-	    static int FLAG_SUBSCRIPTION;
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/SendResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SendResult.cpp b/rocketmq-client4cpp/src/common/SendResult.cpp
deleted file mode 100755
index 5263d29..0000000
--- a/rocketmq-client4cpp/src/common/SendResult.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "SendResult.h"
-#include "UtilAll.h"
-#include "VirtualEnvUtil.h"
-
-namespace rmq
-{
-
-SendResult::SendResult()
-    : m_sendStatus(SEND_OK),m_queueOffset(0)
-{
-}
-
-SendResult::SendResult(const SendStatus& sendStatus,
-                       const std::string&  msgId,
-                       MessageQueue& messageQueue,
-                       long long queueOffset,
-                       std::string&  projectGroupPrefix)
-    : m_sendStatus(sendStatus),
-      m_msgId(msgId),
-      m_messageQueue(messageQueue),
-      m_queueOffset(queueOffset)
-{
-    if (!UtilAll::isBlank(projectGroupPrefix))
-    {
-        m_messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(m_messageQueue.getTopic(),
-                                projectGroupPrefix));
-    }
-}
-
-const std::string&  SendResult::getMsgId()
-{
-    return m_msgId;
-}
-
-void SendResult::setMsgId(const std::string&  msgId)
-{
-    m_msgId = msgId;
-}
-
-SendStatus SendResult::getSendStatus()
-{
-    return m_sendStatus;
-}
-
-void SendResult::setSendStatus(const SendStatus& sendStatus)
-{
-    m_sendStatus = sendStatus;
-}
-
-MessageQueue& SendResult::getMessageQueue()
-{
-    return m_messageQueue;
-}
-
-void SendResult::setMessageQueue(MessageQueue& messageQueue)
-{
-    m_messageQueue = messageQueue;
-}
-
-long long SendResult::getQueueOffset()
-{
-    return m_queueOffset;
-}
-
-void SendResult::setQueueOffset(long long queueOffset)
-{
-    m_queueOffset = queueOffset;
-}
-
-
-bool SendResult::hasResult()
-{
-	return !m_msgId.empty();
-}
-
-
-
-std::string SendResult::toString() const
-{
-    std::stringstream ss;
-    ss << "{sendStatus=" << m_sendStatus
-       << ",msgId=" << m_msgId
-       << ",messageQueue=" << m_messageQueue.toString()
-       << ",queueOffset=" << m_queueOffset
-       << "}";
-    return ss.str();
-}
-
-
-std::string SendResult::toJsonString() const
-{
-    std::stringstream ss;
-    ss << "{\"sendStatus\":\"" << m_sendStatus
-       << "\",\"msgId\":\"" << m_msgId
-       << "\",\"messageQueue\":" << m_messageQueue.toJsonString()
-       << ",\"queueOffset\":\"" << m_queueOffset
-       << "}";
-    return ss.str();
-}
-
-
-
-TransactionSendResult::TransactionSendResult()
-{
-}
-
-LocalTransactionState TransactionSendResult::getLocalTransactionState()
-{
-    return m_localTransactionState;
-}
-
-void TransactionSendResult::setLocalTransactionState(LocalTransactionState localTransactionState)
-{
-    m_localTransactionState = localTransactionState;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceState.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceState.h b/rocketmq-client4cpp/src/common/ServiceState.h
deleted file mode 100755
index 7b41add..0000000
--- a/rocketmq-client4cpp/src/common/ServiceState.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __SERVICESTATE_H__
-#define __SERVICESTATE_H__
-
-namespace rmq
-{
-    enum ServiceState
-    {
-        CREATE_JUST,
-        RUNNING,
-        SHUTDOWN_ALREADY,
-        START_FAILED
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceThread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.cpp b/rocketmq-client4cpp/src/common/ServiceThread.cpp
deleted file mode 100644
index 1abff9f..0000000
--- a/rocketmq-client4cpp/src/common/ServiceThread.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ServiceThread.h"
-#include "Monitor.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-ServiceThread::ServiceThread(const char* name)
-    : kpr::Thread(name),
-      m_notified(false),
-      m_stoped(false)
-{
-
-}
-
-ServiceThread::~ServiceThread()
-{
-
-}
-
-void ServiceThread::stop()
-{
-    m_stoped = true;
-    wakeup();
-}
-
-void ServiceThread::wakeup()
-{
-    kpr::ScopedLock<kpr::Monitor> lock(*this);
-
-    if (!m_notified)
-    {
-        m_notified = true;
-        Notify();
-    }
-}
-
-void ServiceThread::waitForRunning(long interval)
-{
-    kpr::ScopedLock<kpr::Monitor> lock(*this);
-    if (m_notified)
-    {
-        m_notified = false;
-        return;
-    }
-
-    try
-    {
-        Wait(interval);
-    }
-    catch (...)
-    {
-        m_notified = false;
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceThread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.h b/rocketmq-client4cpp/src/common/ServiceThread.h
deleted file mode 100755
index d7ec3ef..0000000
--- a/rocketmq-client4cpp/src/common/ServiceThread.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __SERVICETHREAD_H__
-#define __SERVICETHREAD_H__
-
-#include <string>
-#include "Thread.h"
-#include "Monitor.h"
-
-namespace rmq
-{
-	const long JoinTime = 90 * 1000;
-
-	/**
-	* service thread base class
-	*
-	*/
-	class ServiceThread : public kpr::Thread, public kpr::Monitor
-	{
-	public:
-	    ServiceThread(const char* name = NULL);
-	    virtual ~ServiceThread();
-
-	    virtual std::string  getServiceName() = 0;
-
-	    void stop();
-	    void wakeup();
-	    void waitForRunning(long interval);
-
-	protected:
-	    volatile bool m_notified;
-	    volatile bool m_stoped;
-	};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
deleted file mode 100755
index 12bd48a..0000000
--- a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __SUBSCRIPTIONGROUPCONFIG_H__
-#define __SUBSCRIPTIONGROUPCONFIG_H__
-
-#include <string>
-#include "MixAll.h"
-
-namespace rmq
-{
-    class SubscriptionGroupConfig
-    {
-    public:
-        SubscriptionGroupConfig(const std::string& groupName)
-        {
-            this->groupName = groupName;
-            consumeEnable = true;
-            consumeFromMinEnable = true;
-            consumeBroadcastEnable = true;
-            retryQueueNums = 1;
-            retryMaxTimes = 5;
-            brokerId = MixAll::MASTER_ID;
-            whichBrokerWhenConsumeSlowly = 1;
-        }
-
-        std::string groupName;
-        bool consumeEnable;
-        bool consumeFromMinEnable;
-        bool consumeBroadcastEnable;
-        int retryQueueNums;
-        int retryMaxTimes;
-        long brokerId;
-        long whichBrokerWhenConsumeSlowly;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopAddressing.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopAddressing.h b/rocketmq-client4cpp/src/common/TopAddressing.h
deleted file mode 100755
index 07b0c0c..0000000
--- a/rocketmq-client4cpp/src/common/TopAddressing.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __TOPADDRESSING_H__
-#define  __TOPADDRESSING_H__
-
-#include <string>
-#include <sstream>
-#include "SocketUtil.h"
-
-namespace rmq
-{
-    class TopAddressing
-    {
-    public:
-        TopAddressing()
-			: m_nsAddr("")
-        {
-        }
-
-		const std::string& getNsAddr()
-        {
-            return m_nsAddr;
-        }
-
-		void setNsAddr(std::string& nsAddr)
-        {
-           	m_nsAddr = nsAddr;
-        }
-
-        std::string fetchNSAddr()
-        {
-
-            return "";
-        }
-
-    private:
-		std::string m_nsAddr;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.cpp b/rocketmq-client4cpp/src/common/TopicConfig.cpp
deleted file mode 100644
index 036b41c..0000000
--- a/rocketmq-client4cpp/src/common/TopicConfig.cpp
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include <stdlib.h>
-#include <sstream>
-
-#include "TopicConfig.h"
-#include "PermName.h"
-
-namespace rmq
-{
-
-int TopicConfig::DefaultReadQueueNums = 16;
-int TopicConfig::DefaultWriteQueueNums = 16;
-std::string TopicConfig::SEPARATOR = " ";
-
-TopicConfig::TopicConfig()
-    : m_topicName(""),
-      m_readQueueNums(DefaultReadQueueNums),
-      m_writeQueueNums(DefaultWriteQueueNums),
-      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
-      m_topicFilterType(SINGLE_TAG),
-      m_topicSysFlag(0),
-      m_order(false)
-{
-
-}
-
-TopicConfig::TopicConfig(const std::string& topicName)
-    : m_topicName(topicName),
-      m_readQueueNums(DefaultReadQueueNums),
-      m_writeQueueNums(DefaultWriteQueueNums),
-      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
-      m_topicFilterType(SINGLE_TAG),
-      m_topicSysFlag(0),
-      m_order(false)
-{
-
-}
-
-TopicConfig::TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm)
-    : m_topicName(topicName),
-      m_readQueueNums(readQueueNums),
-      m_writeQueueNums(writeQueueNums),
-      m_perm(perm),
-      m_topicFilterType(SINGLE_TAG),
-      m_topicSysFlag(0),
-      m_order(false)
-{
-}
-
-TopicConfig::~TopicConfig()
-{
-}
-
-std::string  TopicConfig::encode()
-{
-    std::stringstream ss;
-    ss << m_topicName << SEPARATOR
-       << m_readQueueNums << SEPARATOR
-       << m_writeQueueNums << SEPARATOR
-       << m_perm << SEPARATOR
-       << m_topicFilterType;
-
-    return ss.str();
-}
-
-bool  TopicConfig::decode(const std::string& in)
-{
-    std::stringstream ss(in);
-
-    ss >> m_topicName;
-    ss >> m_readQueueNums;
-    ss >> m_writeQueueNums;
-    ss >> m_perm;
-
-    int type;
-    ss >> type;
-    m_topicFilterType = (TopicFilterType)type;
-
-    return true;
-}
-
-const std::string&  TopicConfig::getTopicName()
-{
-    return m_topicName;
-}
-
-void  TopicConfig::setTopicName(const std::string& topicName)
-{
-    m_topicName = topicName;
-}
-
-int  TopicConfig::getReadQueueNums()
-{
-    return m_readQueueNums;
-}
-
-void  TopicConfig::setReadQueueNums(int readQueueNums)
-{
-    m_readQueueNums = readQueueNums;
-}
-
-int  TopicConfig::getWriteQueueNums()
-{
-    return m_writeQueueNums;
-}
-
-void  TopicConfig::setWriteQueueNums(int writeQueueNums)
-{
-    m_writeQueueNums = writeQueueNums;
-}
-
-int  TopicConfig::getPerm()
-{
-    return m_perm;
-}
-
-void  TopicConfig::setPerm(int perm)
-{
-    m_perm = perm;
-}
-
-TopicFilterType  TopicConfig::getTopicFilterType()
-{
-    return m_topicFilterType;
-}
-
-void  TopicConfig::setTopicFilterType(TopicFilterType topicFilterType)
-{
-    m_topicFilterType = topicFilterType;
-}
-
-int  TopicConfig::getTopicSysFlag()
-{
-    return m_topicSysFlag;
-}
-
-void  TopicConfig::setTopicSysFlag(int perm)
-{
-    m_topicSysFlag = perm;
-}
-
-bool  TopicConfig::isOrder()
-{
-    return m_order;
-}
-
-void  TopicConfig::setOrder(bool order)
-{
-    m_order = order;
-}
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.h b/rocketmq-client4cpp/src/common/TopicConfig.h
deleted file mode 100644
index b9f2bcb..0000000
--- a/rocketmq-client4cpp/src/common/TopicConfig.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __TOPICCONFIG_H__
-#define __TOPICCONFIG_H__
-
-#include <string>
-#include "TopicFilterType.h"
-
-namespace rmq
-    {
-    /**
-    * Topic
-    *
-    */
-    class TopicConfig
-    {
-    public:
-        TopicConfig();
-        TopicConfig(const std::string& topicName);
-        TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm);
-        ~TopicConfig();
-
-        std::string encode();
-        bool decode(const std::string& in);
-        const std::string& getTopicName();
-        void setTopicName(const std::string& topicName);
-        int getReadQueueNums();
-        void setReadQueueNums(int readQueueNums);
-        int getWriteQueueNums();
-        void setWriteQueueNums(int writeQueueNums);
-        int getPerm();
-        void setPerm(int perm);
-        TopicFilterType getTopicFilterType();
-        void setTopicFilterType(TopicFilterType topicFilterType);
-		int getTopicSysFlag();
-        void setTopicSysFlag(int topicSysFlag);
-		bool isOrder();
-        void setOrder(bool order);
-
-    public:
-        static int DefaultReadQueueNums;
-        static int DefaultWriteQueueNums;
-
-    private:
-        static std::string SEPARATOR;
-
-        std::string m_topicName;
-        int m_readQueueNums;
-        int m_writeQueueNums;
-        int m_perm;
-        TopicFilterType m_topicFilterType;
-		int m_topicSysFlag;
-		bool m_order;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicStatsTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicStatsTable.h b/rocketmq-client4cpp/src/common/TopicStatsTable.h
deleted file mode 100755
index 4319e54..0000000
--- a/rocketmq-client4cpp/src/common/TopicStatsTable.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __TOPICSTATSTABLE_H__
-#define __TOPICSTATSTABLE_H__
-
-#include <map>
-
-namespace rmq
-{
-    class MessageQueue;
-
-    typedef struct
-    {
-        long long minOffset;
-        long long maxOffset;
-        long long lastUpdateTimestamp;
-    } TopicOffset;
-
-    class TopicStatsTable
-    {
-    public:
-        std::map<MessageQueue*, TopicOffset> getOffsetTable()
-        {
-            return m_offsetTable;
-        }
-
-        void setOffsetTable(const std::map<MessageQueue*, TopicOffset>& offsetTable)
-        {
-            m_offsetTable = offsetTable;
-        }
-
-    private:
-        std::map<MessageQueue*, TopicOffset> m_offsetTable;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/UtilAll.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/UtilAll.h b/rocketmq-client4cpp/src/common/UtilAll.h
deleted file mode 100755
index b239edb..0000000
--- a/rocketmq-client4cpp/src/common/UtilAll.h
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __UTILALL_H__
-#define __UTILALL_H__
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <assert.h>
-#include <time.h>
-
-#include <string>
-#include <sstream>
-#include <vector>
-#include <list>
-#include <set>
-#include <map>
-
-#include "RocketMQClient.h"
-#include "zlib.h"
-#include "json/value.h"
-#include "json/writer.h"
-
-namespace rmq
-{
-    const std::string WHITESPACE = " \t\r\n";
-    const int CHUNK = 8192;
-	const std::string yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss";
-    const std::string yyyy_MM_dd_HH_mm_ss_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
-    const std::string yyyyMMddHHmmss = "yyyyMMddHHmmss";
-
-    class UtilAll
-    {
-    public:
-        static pid_t getPid()
-        {
-            static __thread pid_t pid = 0;
-            if (!pid || pid != getpid())
-            {
-                pid = getpid();
-            }
-            return pid;
-        }
-
-        static pid_t getTid()
-        {
-            static __thread pid_t pid = 0;
-            static __thread pid_t tid = 0;
-            if (!pid || !tid || pid != getpid())
-            {
-                pid = getpid();
-                tid = syscall(__NR_gettid);
-            }
-            return tid;
-        }
-
-        static int Split(std::vector<std::string>& out, const std::string& in, const std::string& delimiter)
-        {
-            std::string::size_type left = 0;
-            for (size_t i = 1; i < in.size(); i++)
-            {
-                std::string::size_type right = in.find(delimiter, left);
-
-                if (right == std::string::npos)
-                {
-                    break;
-                }
-
-                out.push_back(in.substr(left, right - left));
-
-                left = right + delimiter.length();
-            }
-
-            out.push_back(in.substr(left));
-
-            return out.size();
-        }
-
-        static int Split(std::vector<std::string>& out, const std::string& in, const char delimiter)
-        {
-            std::string::size_type left = 0;
-            for (size_t i = 1; i < in.size(); i++)
-            {
-                std::string::size_type right = in.find(delimiter, left);
-
-                if (right == std::string::npos)
-                {
-                    break;
-                }
-
-                out.push_back(in.substr(left, right - left));
-
-                left = right + 1;
-            }
-
-            out.push_back(in.substr(left));
-
-            return out.size();
-        }
-
-        static std::string Trim(const std::string& str)
-        {
-            if (str.empty())
-            {
-                return str;
-            }
-
-            std::string::size_type left = str.find_first_not_of(WHITESPACE);
-
-            if (left == std::string::npos)
-            {
-                return "";
-            }
-
-            std::string::size_type right = str.find_last_not_of(WHITESPACE);
-
-            if (right == std::string::npos)
-            {
-                return str.substr(left);
-            }
-
-            return str.substr(left, right + 1 - left);
-        }
-
-        static bool isBlank(const std::string& str)
-        {
-            if (str.empty())
-            {
-                return true;
-            }
-
-            std::string::size_type left = str.find_first_not_of(WHITESPACE);
-
-            if (left == std::string::npos)
-            {
-                return true;
-            }
-
-            return false;
-        }
-
-        static int availableProcessors()
-        {
-            return 4;
-        }
-
-
-        static int hashCode(const char* pData, int len)
-        {
-            int h = 0;
-            if (pData != NULL && len > 0)
-            {
-                unsigned char c;
-                for (int i = 0; i < len; i++)
-                {
-                    c = (unsigned char)pData[i];
-                    h = 31 * h + c;
-                }
-            }
-
-            return h;
-        }
-
-        static int hashCode(const std::string& s)
-        {
-            return hashCode(s.c_str(), s.length());
-        }
-
-        static int hashCode(const char* pData)
-        {
-            return hashCode(std::string(pData));
-        }
-
-        static int hashCode(char x)
-        {
-            return x;
-        }
-
-        static int hashCode(unsigned char x)
-        {
-            return x;
-        }
-
-        static int hashCode(short x)
-        {
-            return x;
-        }
-
-        static int hashCode(unsigned short x)
-        {
-            return x;
-        }
-
-        static int hashCode(int x)
-        {
-            return x;
-        }
-
-        static int hashCode(unsigned int x)
-        {
-            return x;
-        }
-
-        static int hashCode(long x)
-        {
-            return x;
-        }
-
-        static int hashCode(unsigned long x)
-        {
-            return x;
-        }
-
-        template <typename T>
-        static int hashCode(const std::vector<T>& v)
-        {
-            int h = 0;
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                h += hashCode(*it);
-                ++it;
-            }
-            return h;
-        }
-
-        template <typename T>
-        static int hashCode(const std::set<T>& v)
-        {
-            int h = 0;
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                h += hashCode(*it);
-                ++it;
-            }
-            return h;
-        }
-
-		static std::string toString(Json::Value& json)
-		{
-			Json::FastWriter fastWriter;
-			return fastWriter.write(json);
-		}
-
-        template<typename T>
-        static std::string toString(const T& v)
-        {
-            std::ostringstream ss;
-            ss << v;
-            return ss.str();
-        }
-
-        template<typename T>
-        static std::string toString(const std::vector<T>& v)
-        {
-            std::string s;
-            s.append("[");
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                s.append(toString(*it));
-                s.append(",");
-                ++it;
-            }
-			if (s.size() > 1)
-			{
-				s.erase(s.size() - 1, 1);
-			}
-            s.append("]");
-            return s;
-        }
-
-
-        template <typename T>
-        static std::string toString(const std::list<T>& v)
-        {
-            std::string s;
-            s.append("[");
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                s.append(toString(*it));
-                s.append(",");
-                ++it;
-            }
-			if (s.size() > 1)
-			{
-				s.erase(s.size() - 1, 1);
-			}
-            s.append("]");
-
-            return s;
-        }
-
-        template <typename T>
-        static std::string toString(const std::set<T>& v)
-        {
-            std::string s;
-            s.append("[");
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                s.append(toString(*it));
-                s.append(",");
-                ++it;
-            }
-			if (s.size() > 1)
-			{
-				s.erase(s.size() - 1, 1);
-			}
-            s.append("]");
-            return s;
-        }
-
-        template<typename K, typename V, typename D, typename A>
-        static std::string toString(const std::map<K, V, D, A>& v)
-        {
-            std::string s;
-            s.append("{");
-            typeof(v.begin()) it = v.begin();
-            while (it != v.end())
-            {
-                s.append(toString(it->first));
-                s.append("=");
-                s.append(toString(it->second));
-                s.append(",");
-                ++it;
-            }
-			if (s.size() > 1)
-			{
-				s.erase(s.size() - 1, 1);
-			}
-            s.append("}");
-            return s;
-        }
-
-        template<typename out_type, typename in_type>
-        static out_type convert(const in_type& t)
-        {
-            out_type result;
-            std::stringstream stream;
-            stream << t;
-            stream >> result;
-            return result;
-        }
-
-        static bool compress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen, int level)
-        {
-            int ret, flush;
-            int have;
-            z_stream strm;
-            unsigned char out[CHUNK];
-
-            /* allocate deflate state */
-            strm.zalloc = Z_NULL;
-            strm.zfree = Z_NULL;
-            strm.opaque = Z_NULL;
-            ret = deflateInit(&strm, level);
-            if (ret != Z_OK)
-            {
-                return false;
-            }
-
-            int outBufferLen = inLen;
-            unsigned char* outData = (unsigned char*)malloc(outBufferLen);
-            int left = inLen;
-            int used = 0;
-            int outDataLen = 0;
-
-            /* compress until end of buffer */
-            do
-            {
-                strm.avail_in = left > CHUNK ? CHUNK : left;
-                flush = left <= CHUNK ? Z_FINISH : Z_NO_FLUSH;
-                strm.next_in = (unsigned char*)pIn + used;
-                used += strm.avail_in;
-                left -= strm.avail_in;
-
-                /* run deflate() on input until output buffer not full, finish
-                compression if all of source has been read in */
-                do
-                {
-                    strm.avail_out = CHUNK;
-                    strm.next_out = out;
-                    ret = deflate(&strm, flush);    /* no bad return value */
-                    assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
-                    have = CHUNK - strm.avail_out;
-
-                    if (outDataLen + have > outBufferLen)
-                    {
-                        outBufferLen = outDataLen + have;
-                        outBufferLen <<= 1;
-                        unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
-                        if (!tmp)
-                        {
-                            free(outData);
-                            return false;
-                        }
-
-                        outData = tmp;
-                    }
-
-                    memcpy(outData + outDataLen, out, have);
-                    outDataLen += have;
-
-                }
-                while (strm.avail_out == 0);
-                assert(strm.avail_in == 0);     /* all input will be used */
-
-                /* done when last data in file processed */
-            }
-            while (flush != Z_FINISH);
-            assert(ret == Z_STREAM_END);        /* stream will be complete */
-
-            *pOutLen = outDataLen;
-            *pOut = outData;
-
-            /* clean up and return */
-            (void)deflateEnd(&strm);
-            return true;
-        }
-
-        static bool decompress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen)
-        {
-            int ret;
-            int have;
-            z_stream strm;
-
-            unsigned char out[CHUNK];
-
-            /* allocate inflate state */
-            strm.zalloc = Z_NULL;
-            strm.zfree = Z_NULL;
-            strm.opaque = Z_NULL;
-            strm.avail_in = 0;
-            strm.next_in = Z_NULL;
-            ret = inflateInit(&strm);
-            if (ret != Z_OK)
-            {
-                return false;
-            }
-
-            int outBufferLen = inLen << 2;
-            unsigned char* outData = (unsigned char*)malloc(outBufferLen);
-
-            int left = inLen;
-            int used = 0;
-            int outDataLen = 0;
-
-            /* decompress until deflate stream ends or end of buffer */
-            do
-            {
-                strm.avail_in = left > CHUNK ? CHUNK : left;
-                if (strm.avail_in <= 0)
-                {
-                    break;
-                }
-
-                strm.next_in = (unsigned char*)pIn + used;
-                used += strm.avail_in;
-                left -= strm.avail_in;
-
-                /* run inflate() on input until output buffer not full */
-                do
-                {
-                    strm.avail_out = CHUNK;
-                    strm.next_out = out;
-                    ret = inflate(&strm, Z_NO_FLUSH);
-                    assert(ret != Z_STREAM_ERROR);  /* state not clobbered */
-                    switch (ret)
-                    {
-                        case Z_NEED_DICT:
-                            ret = Z_DATA_ERROR;     /* and fall through */
-                        case Z_DATA_ERROR:
-                        case Z_MEM_ERROR:
-                            (void)inflateEnd(&strm);
-                            free(outData);
-                            return false;
-                    }
-                    have = CHUNK - strm.avail_out;
-
-                    if (outDataLen + have > outBufferLen)
-                    {
-                        outBufferLen = outDataLen + have;
-                        outBufferLen <<= 1;
-                        unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
-                        if (!tmp)
-                        {
-                            free(outData);
-                            return false;
-                        }
-
-                        outData = tmp;
-                    }
-
-                    memcpy(outData + outDataLen, out, have);
-                    outDataLen += have;
-
-                }
-                while (strm.avail_out == 0);
-
-                /* done when inflate() says it's done */
-            }
-            while (ret != Z_STREAM_END);
-
-            /* clean up and return */
-            (void)inflateEnd(&strm);
-
-            if (ret == Z_STREAM_END)
-            {
-                *pOutLen = outDataLen;
-                *pOut = outData;
-
-                return true;
-            }
-            else
-            {
-                free(outData);
-
-                return false;
-            }
-        }
-
-        static unsigned long long hexstr2ull(const char* str)
-        {
-            char* end;
-            return strtoull(str, &end, 16);
-        }
-
-		static long long str2ll(const char *str)
-		{
-			return atoll(str);
-		}
-
-
-		static std::string tm2str(const time_t& t, const std::string& sFormat)
-		{
-			struct tm stTm;
-			localtime_r(&t, &stTm);
-
-			char sTimeString[255] = "\0";
-			strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm);
-
-			return std::string(sTimeString);
-		}
-
-		static std::string now2str(const std::string& sFormat)
-		{
-			time_t t = time(NULL);
-			return tm2str(t, sFormat.c_str());
-		}
-
-		static std::string now2str()
-		{
-			return now2str("%Y-%m-%d %H:%M:%S");
-		}
-
-		static int64_t now2ms()
-		{
-			struct timeval tv;
-			gettimeofday(&tv, NULL);
-			return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
-		}
-
-		static int64_t now2us()
-		{
-			struct timeval tv;
-			gettimeofday(&tv, NULL);
-			return tv.tv_sec * (int64_t)1000*1000 + tv.tv_usec;
-		}
-
-		static int str2tm(const std::string &sString, const std::string &sFormat, struct tm &stTm)
-		{
-			char *p = strptime(sString.c_str(), sFormat.c_str(), &stTm);
-    		return (p != NULL) ? 0 : -1;
-		}
-
-		static time_t str2tm(const std::string &sString, const std::string &sFormat)
-		{
-			struct tm stTm;
-			if (str2tm(sString, sFormat, stTm) == 0)
-			{
-				time_t t = mktime(&stTm);
-				return t;
-			}
-			else
-			{
-				return -1;
-			}
-		}
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/Validators.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.cpp b/rocketmq-client4cpp/src/common/Validators.cpp
deleted file mode 100755
index 29f36a0..0000000
--- a/rocketmq-client4cpp/src/common/Validators.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "Validators.h"
-
-#include <stdlib.h>
-#include <stdio.h>
-#include "MQClientException.h"
-#include "UtilAll.h"
-#include "MixAll.h"
-#include "Message.h"
-#include "MQProtos.h"
-#include "DefaultMQProducer.h"
-
-namespace rmq
-{
-
-const std::string Validators::validPatternStr = "^[a-zA-Z0-9_-]+$";
-const size_t Validators::CHARACTER_MAX_LENGTH = 255;
-
-bool Validators::regularExpressionMatcher(const std::string& origin, const std::string& patternStr)
-{
-    if (UtilAll::isBlank(origin))
-    {
-        return false;
-    }
-
-    if (UtilAll::isBlank(patternStr))
-    {
-        return true;
-    }
-
-    //Pattern pattern = Pattern.compile(patternStr);
-    //Matcher matcher = pattern.matcher(origin);
-
-    //return matcher.matches();
-    return true;
-}
-
-std::string Validators::getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr)
-{
-    /*Pattern pattern = Pattern.compile(patternStr);
-    Matcher matcher = pattern.matcher(origin);
-    while (matcher.find()) {
-    return matcher.group(0);
-    }*/
-    return "";
-}
-
-void Validators::checkTopic(const std::string& topic)
-{
-    if (UtilAll::isBlank(topic))
-    {
-        THROW_MQEXCEPTION(MQClientException, "the specified topic is blank", -1);
-    }
-
-    if (topic.length() > CHARACTER_MAX_LENGTH)
-    {
-        THROW_MQEXCEPTION(MQClientException, "the specified topic is longer than topic max length 255.", -1);
-    }
-
-    // Topic�����Ƿ��뱣���ֶγ�ͻ
-    if (topic == MixAll::DEFAULT_TOPIC)
-    {
-        THROW_MQEXCEPTION(MQClientException, "the topic[" + topic + "] is conflict with default topic.", -1);
-    }
-
-    if (!regularExpressionMatcher(topic, validPatternStr))
-    {
-        std::string str;
-        str = "the specified topic[" + topic + "] contains illegal characters, allowing only" + validPatternStr;
-
-        THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
-    }
-}
-
-void Validators::checkGroup(const std::string& group)
-{
-    if (UtilAll::isBlank(group))
-    {
-        THROW_MQEXCEPTION(MQClientException, "the specified group is blank", -1);
-    }
-
-    if (!regularExpressionMatcher(group, validPatternStr))
-    {
-        std::string str;
-        str = "the specified group[" + group + "] contains illegal characters, allowing only" + validPatternStr;
-
-        THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
-    }
-    if (group.length() > CHARACTER_MAX_LENGTH)
-    {
-        THROW_MQEXCEPTION(MQClientException, "the specified group is longer than group max length 255.", -1);
-    }
-}
-
-void Validators::checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer)
-{
-    checkTopic(msg.getTopic());
-
-    //// body
-    if (msg.getBody() == NULL)
-    {
-        THROW_MQEXCEPTION(MQClientException, "the message body is null", MESSAGE_ILLEGAL_VALUE);
-    }
-
-    if (msg.getBodyLen() == 0)
-    {
-        THROW_MQEXCEPTION(MQClientException, "the message body length is zero", MESSAGE_ILLEGAL_VALUE);
-    }
-
-    if (msg.getBodyLen() > pDefaultMQProducer->getMaxMessageSize())
-    {
-        char info[256];
-        snprintf(info, sizeof(info), "the message body size over max value, MAX: %d", pDefaultMQProducer->getMaxMessageSize());
-        THROW_MQEXCEPTION(MQClientException, info, MESSAGE_ILLEGAL_VALUE);
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/Validators.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.h b/rocketmq-client4cpp/src/common/Validators.h
deleted file mode 100755
index 36ab299..0000000
--- a/rocketmq-client4cpp/src/common/Validators.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __VALIDATORST_H__
-#define  __VALIDATORST_H__
-
-#include <string>
-
-namespace rmq
-{
-    class MQClientException;
-    class DefaultMQProducer;
-    class Message;
-
-    /**
-    * Validator class
-    *
-    * @author manhong.yqd<jo...@gmail.com>
-    * @since 2013-8-28
-    */
-    class Validators
-    {
-    public:
-        static bool regularExpressionMatcher(const std::string& origin, const std::string& patternStr);
-        static std::string getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr);
-
-        static void checkTopic(const std::string& topic);
-        static void checkGroup(const std::string& group);
-        static void checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer);
-
-    public:
-        static const std::string validPatternStr;
-        static const size_t CHARACTER_MAX_LENGTH;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
deleted file mode 100755
index c68bfc8..0000000
--- a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "VirtualEnvUtil.h"
-
-#include <stdlib.h>
-#include <stdio.h>
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-const char* VirtualEnvUtil::VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%";
-
-std::string VirtualEnvUtil::buildWithProjectGroup(const std::string& origin, const std::string& projectGroup)
-{
-    if (!UtilAll::isBlank(projectGroup))
-    {
-        char prefix[1024];
-        snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
-
-        if (origin.find_last_of(prefix) == std::string::npos)
-        {
-            return origin + prefix;
-        }
-        else
-        {
-            return origin;
-        }
-    }
-    else
-    {
-        return origin;
-    }
-}
-
-
-std::string VirtualEnvUtil::clearProjectGroup(const std::string& origin, const std::string& projectGroup)
-{
-    char prefix[1024];
-    snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
-    std::string::size_type pos = origin.find_last_of(prefix);
-
-    if (!UtilAll::isBlank(prefix) && pos != std::string::npos)
-    {
-        return origin.substr(0, pos);
-    }
-    else
-    {
-        return origin;
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
deleted file mode 100755
index 10ca0cd..0000000
--- a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __VIRTUALENVUTIL_H__
-#define __VIRTUALENVUTIL_H__
-
-#include <string>
-
-namespace rmq
-{
-    /**
-    * VirtualEnv API
-    *
-    * @author manhong.yqd<jo...@gmail.com>
-    * @since 2013-8-26
-    */
-    class VirtualEnvUtil
-    {
-    public:
-        static std::string buildWithProjectGroup(const std::string& origin, const std::string& projectGroup);
-        static std::string clearProjectGroup(const std::string& origin, const std::string& projectGroup);
-
-    public:
-        static const char* VIRTUAL_APPGROUP_PREFIX;
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
deleted file mode 100755
index 49e1e7c..0000000
--- a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
-#define __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
-
-#include <algorithm>
-
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientException.h"
-#include "UtilAll.h"
-
-
-namespace rmq
-{
-
-    class AllocateMessageQueueAveragely : public AllocateMessageQueueStrategy
-    {
-    public:
-        virtual ~AllocateMessageQueueAveragely() {}
-        virtual std::vector<MessageQueue>* allocate(
-				const std::string& consumerGroup,
-				const std::string& currentCID,
-                std::vector<MessageQueue>& mqAll,
-                std::list<std::string>& cidAll)
-        {
-            if (currentCID.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
-            }
-
-            if (mqAll.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
-            }
-
-            if (cidAll.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
-            }
-
-            int index = -1;
-            int cidAllSize = cidAll.size();
-
-            std::list<std::string>::iterator it = cidAll.begin();
-            for (int i = 0; it != cidAll.end(); it++, i++)
-            {
-                if (*it == currentCID)
-                {
-                    index = i;
-                    break;
-                }
-            }
-
-            if (index == -1)
-            {
-				RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
-                    consumerGroup.c_str(),
-                    currentCID.c_str(),
-                    UtilAll::toString(cidAll).c_str());
-                return NULL;
-            }
-
-            int mqAllSize = mqAll.size();
-            int mod = mqAllSize % cidAllSize;
-            int averageSize =
-                mqAllSize <= cidAllSize ? 1 : (mod > 0 && index < mod ? mqAllSize / cidAllSize
-                                               + 1 : mqAllSize / cidAllSize);
-            int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
-
-            std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
-            int range = std::min<int>(averageSize, mqAllSize - startIndex);
-
-            for (int i = 0; i < range; i++)
-            {
-                result->push_back(mqAll.at((startIndex + i) % mqAllSize));
-            }
-
-            return result;
-        }
-
-        virtual std::string getName()
-        {
-            return "AVG";
-        }
-    };
-
-
-    class AllocateMessageQueueAveragelyByCircle : public AllocateMessageQueueStrategy
-    {
-    public:
-        virtual ~AllocateMessageQueueAveragelyByCircle() {}
-        virtual std::vector<MessageQueue>* allocate(
-				const std::string& consumerGroup,
-				const std::string& currentCID,
-                std::vector<MessageQueue>& mqAll,
-                std::list<std::string>& cidAll)
-        {
-            if (currentCID.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
-            }
-
-            if (mqAll.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
-            }
-
-            if (cidAll.empty())
-            {
-                THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
-            }
-
-            int index = -1;
-            std::list<std::string>::iterator it = cidAll.begin();
-            for (int i = 0; it != cidAll.end(); it++, i++)
-            {
-                if (*it == currentCID)
-                {
-                    index = i;
-                    break;
-                }
-            }
-
-            if (index == -1)
-            {
-				RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
-                    consumerGroup.c_str(),
-                    currentCID.c_str(),
-                    UtilAll::toString(cidAll).c_str());
-                return NULL;
-            }
-
-			std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
-	        for (int i = index; i < (int)mqAll.size(); i++)
-			{
-	            if (i % (int)cidAll.size() == index)
-				{
-					result->push_back(mqAll.at(i));
-	            }
-	        }
-
-	        return result;
-        }
-
-        virtual std::string getName()
-        {
-            return "AVG_BY_CIRCLE";
-        }
-    };
-
-
-    class AllocateMessageQueueByConfig : public AllocateMessageQueueStrategy
-    {
-    public:
-        virtual ~AllocateMessageQueueByConfig() {}
-        virtual std::vector<MessageQueue>* allocate(
-				const std::string& consumerGroup,
-				const std::string& currentCID,
-                std::vector<MessageQueue>& mqAll,
-                std::list<std::string>& cidAll)
-        {
-            return NULL;
-        }
-
-        virtual std::string getName()
-        {
-            return "CONFIG";
-        }
-    };
-
-
-    class AllocateMessageQueueByMachineRoom : public AllocateMessageQueueStrategy
-    {
-    public:
-        virtual ~AllocateMessageQueueByMachineRoom() {}
-        virtual std::vector<MessageQueue>* allocate(
-				const std::string& consumerGroup,
-				const std::string& currentCID,
-                std::vector<MessageQueue>& mqAll,
-                std::list<std::string>& cidAll)
-        {
-            return NULL;
-        }
-
-        virtual std::string getName()
-        {
-            return "MACHINE_ROOM";
-        }
-    };
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
deleted file mode 100755
index 7550acb..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ /dev/null
@@ -1,476 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ConsumeMessageConcurrentlyService.h"
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include "RebalanceImpl.h"
-#include "DefaultMQPushConsumer.h"
-#include "MixAll.h"
-#include "KPRUtil.h"
-#include "UtilAll.h"
-#include "OffsetStore.h"
-
-namespace rmq
-{
-
-
-class SubmitConsumeRequestLater : public kpr::TimerHandler
-{
-public:
-    SubmitConsumeRequestLater(std::list<MessageExt*>& msgs,
-                              ProcessQueue* pProcessQueue,
-                              MessageQueue messageQueue,
-                              ConsumeMessageConcurrentlyService* pService)
-        : m_msgs(msgs),
-          m_pProcessQueue(pProcessQueue),
-          m_messageQueue(messageQueue),
-          m_pService(pService)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-    	try
-    	{
-        	m_pService->submitConsumeRequest(m_msgs, m_pProcessQueue, m_messageQueue, true);
-        }
-        catch(...)
-        {
-        	RMQ_ERROR("SubmitConsumeRequestLater OnTimeOut exception");
-        }
-
-        delete this;
-    }
-
-private:
-    std::list<MessageExt*> m_msgs;
-    ProcessQueue* m_pProcessQueue;
-    MessageQueue m_messageQueue;
-    ConsumeMessageConcurrentlyService* m_pService;
-};
-
-
-class CleanExpireMsgTask : public kpr::TimerHandler
-{
-public:
-    CleanExpireMsgTask(ConsumeMessageConcurrentlyService* pService)
-        : m_pService(pService)
-    {
-
-    }
-
-    void OnTimeOut(unsigned int timerID)
-    {
-    	try
-    	{
-        	m_pService->cleanExpireMsg();
-        }
-        catch(...)
-        {
-        	RMQ_ERROR("CleanExpireMsgTask OnTimeOut exception");
-        }
-    }
-
-private:
-    ConsumeMessageConcurrentlyService* m_pService;
-};
-
-
-
-ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService(
-    DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
-    MessageListenerConcurrently* pMessageListener)
-{
-    m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
-    m_pMessageListener = pMessageListener;
-    m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
-    m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
-    m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 5,
-    m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
-    m_pScheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 1000);
-    m_pCleanExpireMsgExecutors = new kpr::TimerThread("CleanExpireMsgService", 1000);
-	m_pCleanExpireMsgTask = new CleanExpireMsgTask(this);
-}
-
-ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService()
-{
-	delete m_pCleanExpireMsgTask;
-}
-
-
-void ConsumeMessageConcurrentlyService::start()
-{
-	m_pCleanExpireMsgExecutors->RegisterTimer(60 * 1000, 60 * 1000, m_pCleanExpireMsgTask, true);
-    m_pScheduledExecutorService->Start();
-    m_pCleanExpireMsgExecutors->Start();
-}
-
-void ConsumeMessageConcurrentlyService::shutdown()
-{
-    m_pConsumeExecutor->Destroy();
-    m_pScheduledExecutorService->Stop();
-    m_pScheduledExecutorService->Join();
-
-    m_pCleanExpireMsgExecutors->Stop();
-    m_pCleanExpireMsgExecutors->Join();
-}
-
-
-void ConsumeMessageConcurrentlyService::cleanExpireMsg()
-{
-	kpr::ScopedRLock<kpr::RWMutex> lock(m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTableLock());
-	std::map<MessageQueue, ProcessQueue*>& processQueueTable
-		= m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTable();
-	RMQ_FOR_EACH(processQueueTable, it)
-	{
-		ProcessQueue* pq = it->second;
-		if (!pq->isDropped())
-		{
-        	pq->cleanExpiredMsg(m_pDefaultMQPushConsumer);
-        }
-	}
-}
-
-
-ConsumerStat& ConsumeMessageConcurrentlyService::getConsumerStat()
-{
-    return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
-}
-
-bool ConsumeMessageConcurrentlyService::sendMessageBack(MessageExt& msg,
-        ConsumeConcurrentlyContext& context)
-{
-    try
-    {
-        m_pDefaultMQPushConsumerImpl->sendMessageBack(msg,
-        	context.delayLevelWhenNextConsume, context.messageQueue.getBrokerName());
-        return true;
-    }
-    catch (...)
-    {
-		RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
-			m_consumerGroup.c_str(), msg.toString().c_str());
-    }
-
-    return false;
-}
-
-void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(std::list<MessageExt*>& msgs,
-        ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue)
-{
-    SubmitConsumeRequestLater* sc = new SubmitConsumeRequestLater(msgs, pProcessQueue, messageQueue, this);
-    m_pScheduledExecutorService->RegisterTimer(0, 5000, sc, false);
-}
-
-void ConsumeMessageConcurrentlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
-        ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue,
-        bool dispathToConsume)
-{
-	size_t consumeBatchSize = m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize();
-
-	RMQ_DEBUG("submitConsumeRequest begin, msgs.size=%d, messageQueue=%s, consumeBatchSize=%d, dispathToConsume=%d",
-		(int)msgs.size(), messageQueue.toString().c_str(), (int)consumeBatchSize, dispathToConsume
-    );
-
-    if (msgs.size() <= consumeBatchSize)
-    {
-        kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgs, pProcessQueue, messageQueue, this);
-        m_pConsumeExecutor->AddWork(consumeRequest);
-    }
-    else
-    {
-        std::list<MessageExt*>::iterator it = msgs.begin();
-        for (; it != msgs.end();)
-        {
-            std::list<MessageExt*> msgThis;
-            for (size_t i = 0; i < consumeBatchSize; i++, it++)
-            {
-                if (it != msgs.end())
-                {
-                    msgThis.push_back(*it);
-                }
-                else
-                {
-                    break;
-                }
-            }
-
-            kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgThis, pProcessQueue, messageQueue, this);
-            m_pConsumeExecutor->AddWork(consumeRequest);
-        }
-    }
-
-    RMQ_DEBUG("submitConsumeRequest end");
-}
-
-void ConsumeMessageConcurrentlyService::updateCorePoolSize(int corePoolSize)
-{
-	//todo
-}
-
-void ConsumeMessageConcurrentlyService::processConsumeResult(ConsumeConcurrentlyStatus status,
-        ConsumeConcurrentlyContext& context,
-        ConsumeConcurrentlyRequest& consumeRequest)
-{
-    int ackIndex = context.ackIndex;
-
-    if (consumeRequest.getMsgs().empty())
-    {
-        return;
-    }
-
-    int msgsSize = consumeRequest.getMsgs().size();
-
-    switch (status)
-    {
-        case CONSUME_SUCCESS:
-        {
-            if (ackIndex >= msgsSize)
-            {
-                ackIndex = msgsSize - 1;
-            }
-
-            int ok = ackIndex + 1;
-            int failed = msgsSize - ok;
-            getConsumerStat().consumeMsgOKTotal.fetchAndAdd(ok);
-            getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(failed);
-        }
-
-        break;
-        case RECONSUME_LATER:
-            ackIndex = -1;
-            getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
-            break;
-        default:
-            break;
-    }
-
-    std::list<MessageExt*>& msgs = consumeRequest.getMsgs();
-    std::list<MessageExt*>::iterator it = msgs.begin();
-
-    for (int i = 0; i < ackIndex + 1 && it != msgs.end(); i++)
-    {
-        it++;
-    }
-
-    switch (m_pDefaultMQPushConsumer->getMessageModel())
-    {
-        case BROADCASTING:
-            for (; it != msgs.end(); it++)
-            {
-                MessageExt* msg = *it;
-                RMQ_WARN("BROADCASTING, the message consume failed, drop it, %s", msg->toString().c_str());
-            }
-            break;
-        case CLUSTERING:
-        {
-            std::list<MessageExt*> msgBackFailed;
-            for (; it != msgs.end(); it++)
-            {
-                MessageExt* msg = *it;
-                bool result = sendMessageBack(*msg, context);
-                if (!result)
-                {
-                    msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
-                    msgBackFailed.push_back(msg);
-                }
-            }
-
-            if (!msgBackFailed.empty())
-            {
-                it = msgs.begin();
-
-                for (; it != msgs.end();)
-                {
-                    bool find = false;
-                    std::list<MessageExt*>::iterator itFailed = msgBackFailed.begin();
-                    for (; itFailed != msgBackFailed.end(); itFailed++)
-                    {
-                        if (*it == *itFailed)
-                        {
-                            it = msgs.erase(it);
-                            find = true;
-                            break;
-                        }
-                    }
-
-                    if (!find)
-                    {
-                        it++;
-                    }
-                }
-
-                submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
-                                          consumeRequest.getMessageQueue());
-            }
-        }
-        break;
-        default:
-            break;
-    }
-
-    long long offset = consumeRequest.getProcessQueue()->removeMessage(consumeRequest.getMsgs());
-    if (offset >= 0 && !(consumeRequest.getProcessQueue()->isDropped()))
-    {
-        m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
-                offset, true);
-    }
-}
-
-std::string& ConsumeMessageConcurrentlyService::getConsumerGroup()
-{
-    return m_consumerGroup;
-}
-
-MessageListenerConcurrently* ConsumeMessageConcurrentlyService::getMessageListener()
-{
-    return m_pMessageListener;
-}
-
-DefaultMQPushConsumerImpl* ConsumeMessageConcurrentlyService::getDefaultMQPushConsumerImpl()
-{
-    return m_pDefaultMQPushConsumerImpl;
-}
-
-ConsumeConcurrentlyRequest::ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
-        ProcessQueue* pProcessQueue,
-        const MessageQueue& messageQueue,
-        ConsumeMessageConcurrentlyService* pService)
-{
-	m_msgs = msgs;
-	m_pProcessQueue = pProcessQueue;
-	m_pService = pService;
-    m_messageQueue = messageQueue;
-}
-
-ConsumeConcurrentlyRequest::~ConsumeConcurrentlyRequest()
-{
-	m_msgs.clear();
-}
-
-void ConsumeConcurrentlyRequest::Do()
-{
-	RMQ_DEBUG("consumeMessage begin, m_msgs.size=%d", (int)m_msgs.size());
-
-    if (m_pProcessQueue->isDropped())
-    {
-        RMQ_WARN("the message queue not be able to consume, because it's droped, {%s}",
-        	m_messageQueue.toString().c_str());
-        return;
-    }
-
-	try
-	{
-	    MessageListenerConcurrently* listener = m_pService->getMessageListener();
-	    ConsumeConcurrentlyContext context(m_messageQueue);
-	    ConsumeConcurrentlyStatus status = RECONSUME_LATER;
-
-	    ConsumeMessageContext consumeMessageContext;
-	    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
-	    {
-	        consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
-	        consumeMessageContext.mq = m_messageQueue;
-	        consumeMessageContext.msgList = m_msgs;
-	        consumeMessageContext.success = false;
-	        m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
-	    }
-
-	    long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
-	    try
-	    {
-	        resetRetryTopic(m_msgs);
-	        if (!m_msgs.empty())
-	        {
-	        	std::list<MessageExt*>::iterator it = m_msgs.begin();
-			    for (; it != m_msgs.end(); it++)
-			    {
-			        MessageExt* msg = (*it);
-			        msg->putProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP,
-			        	UtilAll::toString(KPRUtil::GetCurrentTimeMillis()));
-			    }
-            }
-	        status = listener->consumeMessage(m_msgs, context);
-	    }
-	    catch (...)
-	    {
-	        RMQ_WARN("consumeMessage exception, Group: {%s} Msgs: {%d} MQ: {%s}",
-	        	m_pService->getConsumerGroup().c_str(),
-	        	(int)m_msgs.size(),
-	        	m_messageQueue.toString().c_str()
-	        );
-	    }
-
-	    long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
-
-	    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
-	    {
-	        consumeMessageContext.success = (status == CONSUME_SUCCESS);
-	        m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
-	    }
-
-	    m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
-	    bool updated = MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat().consumeMsgRTMax, consumeRT);
-	    if (updated)
-	    {
-	        RMQ_WARN("consumeMessage RT new max: %lld, Group: %s, Msgs: %d, MQ: %s",
-	        	consumeRT,
-	        	m_pService->getConsumerGroup().c_str(),
-	        	(int)m_msgs.size(),
-	        	m_messageQueue.toString().c_str()
-	        );
-	    }
-
-		if (!m_pProcessQueue->isDropped())
-		{
-	    	m_pService->processConsumeResult(status, context, *this);
-	    }
-	    else
-	    {
-	    	RMQ_WARN("processQueue is dropped without process consume result, messageQueue={%s}, msgs.size={%d}",
-	        	m_messageQueue.toString().c_str(), (int)m_msgs.size());
-	    }
-	}
-	catch(...)
-	{
-		RMQ_WARN("ConsumeConcurrentlyRequest exception");
-	}
-	RMQ_DEBUG("consumeMessage end, m_msgs.size=%d", (int)m_msgs.size());
-
-    return;
-}
-
-void ConsumeConcurrentlyRequest::resetRetryTopic(std::list<MessageExt*>& msgs)
-{
-    std::string groupTopic = MixAll::getRetryTopic(m_pService->getConsumerGroup());
-    std::list<MessageExt*>::iterator it = msgs.begin();
-
-    for (; it != msgs.end(); it++)
-    {
-        MessageExt* msg = (*it);
-        std::string retryTopic = msg->getProperty(Message::PROPERTY_RETRY_TOPIC);
-        if (!retryTopic.empty() && groupTopic == msg->getTopic())
-        {
-            msg->setTopic(retryTopic);
-        }
-    }
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
deleted file mode 100755
index acb7538..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
-#define __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
-
-#include "ConsumeMessageService.h"
-
-#include <list>
-#include <string>
-#include "MessageQueueLock.h"
-#include "ConsumerStatManage.h"
-#include "MessageExt.h"
-#include "MessageListener.h"
-#include "ProcessQueue.h"
-#include "ThreadPool.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
-  class DefaultMQPushConsumerImpl;
-  class DefaultMQPushConsumer;
-  class MessageListenerConcurrently;
-  class ConsumeMessageConcurrentlyService;
-
-  class ConsumeConcurrentlyRequest: public kpr::ThreadPoolWork
-  {
-  public:
-      ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
-                                 ProcessQueue* pProcessQueue,
-                                 const MessageQueue& messageQueue,
-                                 ConsumeMessageConcurrentlyService* pService);
-      ~ConsumeConcurrentlyRequest();
-      virtual void Do();
-
-      std::list<MessageExt*>& getMsgs()
-      {
-          return m_msgs;
-      }
-
-      ProcessQueue* getProcessQueue()
-      {
-          return m_pProcessQueue;
-      }
-
-      MessageQueue getMessageQueue()
-      {
-          return m_messageQueue;
-      }
-
-  private:
-      void resetRetryTopic(std::list<MessageExt*>& msgs);
-
-  private:
-      std::list<MessageExt*> m_msgs;
-      ProcessQueue* m_pProcessQueue;
-      MessageQueue m_messageQueue;
-      ConsumeMessageConcurrentlyService* m_pService;
-  };
-
-
-  class ConsumeMessageConcurrentlyService : public ConsumeMessageService
-  {
-  public:
-      ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
-                                        MessageListenerConcurrently* pMessageListener);
-	  ~ConsumeMessageConcurrentlyService();
-
-      void start();
-      void shutdown();
-
-	  void cleanExpireMsg();
-      ConsumerStat& getConsumerStat();
-
-      bool sendMessageBack(MessageExt& msg, ConsumeConcurrentlyContext& context);
-      void processConsumeResult(ConsumeConcurrentlyStatus status,
-                                ConsumeConcurrentlyContext& context,
-                                ConsumeConcurrentlyRequest& consumeRequest);
-
-      void submitConsumeRequestLater(std::list<MessageExt*>& pMsgs,
-                                     ProcessQueue* pProcessQueue,
-                                     const MessageQueue& messageQueue);
-
-      void submitConsumeRequest(std::list<MessageExt*>& pMsgs,
-                                ProcessQueue* pProcessQueue,
-                                const MessageQueue& messageQueue,
-                                bool dispathToConsume);
-
-      void updateCorePoolSize(int corePoolSize);
-
-      std::string& getConsumerGroup();
-      MessageListenerConcurrently* getMessageListener();
-      DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl();
-
-  private:
-      DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
-      DefaultMQPushConsumer* m_pDefaultMQPushConsumer;
-      MessageListenerConcurrently* m_pMessageListener;
-      std::string m_consumerGroup;
-      kpr::ThreadPoolPtr m_pConsumeExecutor;
-      kpr::TimerThreadPtr m_pScheduledExecutorService;
-      kpr::TimerThreadPtr m_pCleanExpireMsgExecutors;
-      kpr::TimerHandler* m_pCleanExpireMsgTask;
-  };
-}
-
-#endif