You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:47 UTC
[13/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PermName.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.cpp b/rocketmq-client4cpp/src/common/PermName.cpp
deleted file mode 100644
index 084de79..0000000
--- a/rocketmq-client4cpp/src/common/PermName.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PermName.h"
-
-namespace rmq
-{
-
-int PermName::PERM_PRIORITY = 0x1 << 3;
-int PermName::PERM_READ = 0x1 << 2;
-int PermName::PERM_WRITE = 0x1 << 1;
-int PermName::PERM_INHERIT = 0x1 << 0;
-
-bool PermName::isReadable(int perm)
-{
- return (perm & PERM_READ) == PERM_READ;
-}
-
-bool PermName::isWriteable(int perm)
-{
- return (perm & PERM_WRITE) == PERM_WRITE;
-}
-
-bool PermName::isInherited(int perm)
-{
- return (perm & PERM_INHERIT) == PERM_INHERIT;
-}
-
-std::string PermName::perm2String(int perm)
-{
- std::string pm("---");
- if (isReadable(perm))
- {
- pm.replace(0, 1, "R");
- }
-
- if (isWriteable(perm))
- {
- pm.replace(1, 2, "W");
- }
-
- if (isInherited(perm))
- {
- pm.replace(2, 3, "X");
- }
-
- return pm;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PermName.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PermName.h b/rocketmq-client4cpp/src/common/PermName.h
deleted file mode 100644
index 364ddeb..0000000
--- a/rocketmq-client4cpp/src/common/PermName.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PERMNAME_H__
-#define __PERMNAME_H__
-
-#include <string>
-
-namespace rmq
-{
- class PermName
- {
- public:
- static int PERM_PRIORITY;
- static int PERM_READ;
- static int PERM_WRITE;
- static int PERM_INHERIT;
-
- static bool isReadable(int perm);
- static bool isWriteable(int perm);
- static bool isInherited(int perm);
- static std::string perm2String(int perm);
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PullSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.cpp b/rocketmq-client4cpp/src/common/PullSysFlag.cpp
deleted file mode 100644
index f6fc1c2..0000000
--- a/rocketmq-client4cpp/src/common/PullSysFlag.cpp
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "PullSysFlag.h"
-
-namespace rmq
-{
-
-int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0;
-int PullSysFlag::FLAG_SUSPEND = 0x1 << 1;
-int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2;
-
-int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend, bool subscription)
-{
- int flag = 0;
-
- if (commitOffset)
- {
- flag |= FLAG_COMMIT_OFFSET;
- }
-
- if (suspend)
- {
- flag |= FLAG_SUSPEND;
- }
-
- if (subscription)
- {
- flag |= FLAG_SUBSCRIPTION;
- }
-
- return flag;
-}
-
-int PullSysFlag::clearCommitOffsetFlag(int sysFlag)
-{
- return sysFlag & (~FLAG_COMMIT_OFFSET);
-}
-
-bool PullSysFlag::hasCommitOffsetFlag(int sysFlag)
-{
- return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
-}
-
-bool PullSysFlag::hasSuspendFlag(int sysFlag)
-{
- return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
-}
-
-bool PullSysFlag::hasSubscriptionFlag(int sysFlag)
-{
- return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/PullSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/PullSysFlag.h b/rocketmq-client4cpp/src/common/PullSysFlag.h
deleted file mode 100755
index c19eac3..0000000
--- a/rocketmq-client4cpp/src/common/PullSysFlag.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PULLSYSFLAG_H__
-#define __PULLSYSFLAG_H__
-
-namespace rmq
-{
- class PullSysFlag
- {
- public:
- static int buildSysFlag(bool commitOffset, bool suspend, bool subscription);
- static int clearCommitOffsetFlag(int sysFlag);
- static bool hasCommitOffsetFlag(int sysFlag);
- static bool hasSuspendFlag(int sysFlag);
- static bool hasSubscriptionFlag(int sysFlag);
-
- private:
- static int FLAG_COMMIT_OFFSET;
- static int FLAG_SUSPEND;
- static int FLAG_SUBSCRIPTION;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/SendResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SendResult.cpp b/rocketmq-client4cpp/src/common/SendResult.cpp
deleted file mode 100755
index 5263d29..0000000
--- a/rocketmq-client4cpp/src/common/SendResult.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "SendResult.h"
-#include "UtilAll.h"
-#include "VirtualEnvUtil.h"
-
-namespace rmq
-{
-
-SendResult::SendResult()
- : m_sendStatus(SEND_OK),m_queueOffset(0)
-{
-}
-
-SendResult::SendResult(const SendStatus& sendStatus,
- const std::string& msgId,
- MessageQueue& messageQueue,
- long long queueOffset,
- std::string& projectGroupPrefix)
- : m_sendStatus(sendStatus),
- m_msgId(msgId),
- m_messageQueue(messageQueue),
- m_queueOffset(queueOffset)
-{
- if (!UtilAll::isBlank(projectGroupPrefix))
- {
- m_messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(m_messageQueue.getTopic(),
- projectGroupPrefix));
- }
-}
-
-const std::string& SendResult::getMsgId()
-{
- return m_msgId;
-}
-
-void SendResult::setMsgId(const std::string& msgId)
-{
- m_msgId = msgId;
-}
-
-SendStatus SendResult::getSendStatus()
-{
- return m_sendStatus;
-}
-
-void SendResult::setSendStatus(const SendStatus& sendStatus)
-{
- m_sendStatus = sendStatus;
-}
-
-MessageQueue& SendResult::getMessageQueue()
-{
- return m_messageQueue;
-}
-
-void SendResult::setMessageQueue(MessageQueue& messageQueue)
-{
- m_messageQueue = messageQueue;
-}
-
-long long SendResult::getQueueOffset()
-{
- return m_queueOffset;
-}
-
-void SendResult::setQueueOffset(long long queueOffset)
-{
- m_queueOffset = queueOffset;
-}
-
-
-bool SendResult::hasResult()
-{
- return !m_msgId.empty();
-}
-
-
-
-std::string SendResult::toString() const
-{
- std::stringstream ss;
- ss << "{sendStatus=" << m_sendStatus
- << ",msgId=" << m_msgId
- << ",messageQueue=" << m_messageQueue.toString()
- << ",queueOffset=" << m_queueOffset
- << "}";
- return ss.str();
-}
-
-
-std::string SendResult::toJsonString() const
-{
- std::stringstream ss;
- ss << "{\"sendStatus\":\"" << m_sendStatus
- << "\",\"msgId\":\"" << m_msgId
- << "\",\"messageQueue\":" << m_messageQueue.toJsonString()
- << ",\"queueOffset\":\"" << m_queueOffset
- << "}";
- return ss.str();
-}
-
-
-
-TransactionSendResult::TransactionSendResult()
-{
-}
-
-LocalTransactionState TransactionSendResult::getLocalTransactionState()
-{
- return m_localTransactionState;
-}
-
-void TransactionSendResult::setLocalTransactionState(LocalTransactionState localTransactionState)
-{
- m_localTransactionState = localTransactionState;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceState.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceState.h b/rocketmq-client4cpp/src/common/ServiceState.h
deleted file mode 100755
index 7b41add..0000000
--- a/rocketmq-client4cpp/src/common/ServiceState.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __SERVICESTATE_H__
-#define __SERVICESTATE_H__
-
-namespace rmq
-{
- enum ServiceState
- {
- CREATE_JUST,
- RUNNING,
- SHUTDOWN_ALREADY,
- START_FAILED
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceThread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.cpp b/rocketmq-client4cpp/src/common/ServiceThread.cpp
deleted file mode 100644
index 1abff9f..0000000
--- a/rocketmq-client4cpp/src/common/ServiceThread.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "ServiceThread.h"
-#include "Monitor.h"
-#include "ScopedLock.h"
-
-namespace rmq
-{
-
-ServiceThread::ServiceThread(const char* name)
- : kpr::Thread(name),
- m_notified(false),
- m_stoped(false)
-{
-
-}
-
-ServiceThread::~ServiceThread()
-{
-
-}
-
-void ServiceThread::stop()
-{
- m_stoped = true;
- wakeup();
-}
-
-void ServiceThread::wakeup()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
-
- if (!m_notified)
- {
- m_notified = true;
- Notify();
- }
-}
-
-void ServiceThread::waitForRunning(long interval)
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- if (m_notified)
- {
- m_notified = false;
- return;
- }
-
- try
- {
- Wait(interval);
- }
- catch (...)
- {
- m_notified = false;
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/ServiceThread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ServiceThread.h b/rocketmq-client4cpp/src/common/ServiceThread.h
deleted file mode 100755
index d7ec3ef..0000000
--- a/rocketmq-client4cpp/src/common/ServiceThread.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __SERVICETHREAD_H__
-#define __SERVICETHREAD_H__
-
-#include <string>
-#include "Thread.h"
-#include "Monitor.h"
-
-namespace rmq
-{
- const long JoinTime = 90 * 1000;
-
- /**
- * service thread base class
- *
- */
- class ServiceThread : public kpr::Thread, public kpr::Monitor
- {
- public:
- ServiceThread(const char* name = NULL);
- virtual ~ServiceThread();
-
- virtual std::string getServiceName() = 0;
-
- void stop();
- void wakeup();
- void waitForRunning(long interval);
-
- protected:
- volatile bool m_notified;
- volatile bool m_stoped;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
deleted file mode 100755
index 12bd48a..0000000
--- a/rocketmq-client4cpp/src/common/SubscriptionGroupConfig.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __SUBSCRIPTIONGROUPCONFIG_H__
-#define __SUBSCRIPTIONGROUPCONFIG_H__
-
-#include <string>
-#include "MixAll.h"
-
-namespace rmq
-{
- class SubscriptionGroupConfig
- {
- public:
- SubscriptionGroupConfig(const std::string& groupName)
- {
- this->groupName = groupName;
- consumeEnable = true;
- consumeFromMinEnable = true;
- consumeBroadcastEnable = true;
- retryQueueNums = 1;
- retryMaxTimes = 5;
- brokerId = MixAll::MASTER_ID;
- whichBrokerWhenConsumeSlowly = 1;
- }
-
- std::string groupName;
- bool consumeEnable;
- bool consumeFromMinEnable;
- bool consumeBroadcastEnable;
- int retryQueueNums;
- int retryMaxTimes;
- long brokerId;
- long whichBrokerWhenConsumeSlowly;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopAddressing.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopAddressing.h b/rocketmq-client4cpp/src/common/TopAddressing.h
deleted file mode 100755
index 07b0c0c..0000000
--- a/rocketmq-client4cpp/src/common/TopAddressing.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __TOPADDRESSING_H__
-#define __TOPADDRESSING_H__
-
-#include <string>
-#include <sstream>
-#include "SocketUtil.h"
-
-namespace rmq
-{
- class TopAddressing
- {
- public:
- TopAddressing()
- : m_nsAddr("")
- {
- }
-
- const std::string& getNsAddr()
- {
- return m_nsAddr;
- }
-
- void setNsAddr(std::string& nsAddr)
- {
- m_nsAddr = nsAddr;
- }
-
- std::string fetchNSAddr()
- {
-
- return "";
- }
-
- private:
- std::string m_nsAddr;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.cpp b/rocketmq-client4cpp/src/common/TopicConfig.cpp
deleted file mode 100644
index 036b41c..0000000
--- a/rocketmq-client4cpp/src/common/TopicConfig.cpp
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include <stdlib.h>
-#include <sstream>
-
-#include "TopicConfig.h"
-#include "PermName.h"
-
-namespace rmq
-{
-
-int TopicConfig::DefaultReadQueueNums = 16;
-int TopicConfig::DefaultWriteQueueNums = 16;
-std::string TopicConfig::SEPARATOR = " ";
-
-TopicConfig::TopicConfig()
- : m_topicName(""),
- m_readQueueNums(DefaultReadQueueNums),
- m_writeQueueNums(DefaultWriteQueueNums),
- m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
- m_topicFilterType(SINGLE_TAG),
- m_topicSysFlag(0),
- m_order(false)
-{
-
-}
-
-TopicConfig::TopicConfig(const std::string& topicName)
- : m_topicName(topicName),
- m_readQueueNums(DefaultReadQueueNums),
- m_writeQueueNums(DefaultWriteQueueNums),
- m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
- m_topicFilterType(SINGLE_TAG),
- m_topicSysFlag(0),
- m_order(false)
-{
-
-}
-
-TopicConfig::TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm)
- : m_topicName(topicName),
- m_readQueueNums(readQueueNums),
- m_writeQueueNums(writeQueueNums),
- m_perm(perm),
- m_topicFilterType(SINGLE_TAG),
- m_topicSysFlag(0),
- m_order(false)
-{
-}
-
-TopicConfig::~TopicConfig()
-{
-}
-
-std::string TopicConfig::encode()
-{
- std::stringstream ss;
- ss << m_topicName << SEPARATOR
- << m_readQueueNums << SEPARATOR
- << m_writeQueueNums << SEPARATOR
- << m_perm << SEPARATOR
- << m_topicFilterType;
-
- return ss.str();
-}
-
-bool TopicConfig::decode(const std::string& in)
-{
- std::stringstream ss(in);
-
- ss >> m_topicName;
- ss >> m_readQueueNums;
- ss >> m_writeQueueNums;
- ss >> m_perm;
-
- int type;
- ss >> type;
- m_topicFilterType = (TopicFilterType)type;
-
- return true;
-}
-
-const std::string& TopicConfig::getTopicName()
-{
- return m_topicName;
-}
-
-void TopicConfig::setTopicName(const std::string& topicName)
-{
- m_topicName = topicName;
-}
-
-int TopicConfig::getReadQueueNums()
-{
- return m_readQueueNums;
-}
-
-void TopicConfig::setReadQueueNums(int readQueueNums)
-{
- m_readQueueNums = readQueueNums;
-}
-
-int TopicConfig::getWriteQueueNums()
-{
- return m_writeQueueNums;
-}
-
-void TopicConfig::setWriteQueueNums(int writeQueueNums)
-{
- m_writeQueueNums = writeQueueNums;
-}
-
-int TopicConfig::getPerm()
-{
- return m_perm;
-}
-
-void TopicConfig::setPerm(int perm)
-{
- m_perm = perm;
-}
-
-TopicFilterType TopicConfig::getTopicFilterType()
-{
- return m_topicFilterType;
-}
-
-void TopicConfig::setTopicFilterType(TopicFilterType topicFilterType)
-{
- m_topicFilterType = topicFilterType;
-}
-
-int TopicConfig::getTopicSysFlag()
-{
- return m_topicSysFlag;
-}
-
-void TopicConfig::setTopicSysFlag(int perm)
-{
- m_topicSysFlag = perm;
-}
-
-bool TopicConfig::isOrder()
-{
- return m_order;
-}
-
-void TopicConfig::setOrder(bool order)
-{
- m_order = order;
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicConfig.h b/rocketmq-client4cpp/src/common/TopicConfig.h
deleted file mode 100644
index b9f2bcb..0000000
--- a/rocketmq-client4cpp/src/common/TopicConfig.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __TOPICCONFIG_H__
-#define __TOPICCONFIG_H__
-
-#include <string>
-#include "TopicFilterType.h"
-
-namespace rmq
- {
- /**
- * Topic
- *
- */
- class TopicConfig
- {
- public:
- TopicConfig();
- TopicConfig(const std::string& topicName);
- TopicConfig(const std::string& topicName, int readQueueNums, int writeQueueNums, int perm);
- ~TopicConfig();
-
- std::string encode();
- bool decode(const std::string& in);
- const std::string& getTopicName();
- void setTopicName(const std::string& topicName);
- int getReadQueueNums();
- void setReadQueueNums(int readQueueNums);
- int getWriteQueueNums();
- void setWriteQueueNums(int writeQueueNums);
- int getPerm();
- void setPerm(int perm);
- TopicFilterType getTopicFilterType();
- void setTopicFilterType(TopicFilterType topicFilterType);
- int getTopicSysFlag();
- void setTopicSysFlag(int topicSysFlag);
- bool isOrder();
- void setOrder(bool order);
-
- public:
- static int DefaultReadQueueNums;
- static int DefaultWriteQueueNums;
-
- private:
- static std::string SEPARATOR;
-
- std::string m_topicName;
- int m_readQueueNums;
- int m_writeQueueNums;
- int m_perm;
- TopicFilterType m_topicFilterType;
- int m_topicSysFlag;
- bool m_order;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/TopicStatsTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/TopicStatsTable.h b/rocketmq-client4cpp/src/common/TopicStatsTable.h
deleted file mode 100755
index 4319e54..0000000
--- a/rocketmq-client4cpp/src/common/TopicStatsTable.h
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __TOPICSTATSTABLE_H__
-#define __TOPICSTATSTABLE_H__
-
-#include <map>
-
-namespace rmq
-{
- class MessageQueue;
-
- typedef struct
- {
- long long minOffset;
- long long maxOffset;
- long long lastUpdateTimestamp;
- } TopicOffset;
-
- class TopicStatsTable
- {
- public:
- std::map<MessageQueue*, TopicOffset> getOffsetTable()
- {
- return m_offsetTable;
- }
-
- void setOffsetTable(const std::map<MessageQueue*, TopicOffset>& offsetTable)
- {
- m_offsetTable = offsetTable;
- }
-
- private:
- std::map<MessageQueue*, TopicOffset> m_offsetTable;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/UtilAll.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/UtilAll.h b/rocketmq-client4cpp/src/common/UtilAll.h
deleted file mode 100755
index b239edb..0000000
--- a/rocketmq-client4cpp/src/common/UtilAll.h
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __UTILALL_H__
-#define __UTILALL_H__
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-#include <assert.h>
-#include <time.h>
-
-#include <string>
-#include <sstream>
-#include <vector>
-#include <list>
-#include <set>
-#include <map>
-
-#include "RocketMQClient.h"
-#include "zlib.h"
-#include "json/value.h"
-#include "json/writer.h"
-
-namespace rmq
-{
- const std::string WHITESPACE = " \t\r\n";
- const int CHUNK = 8192;
- const std::string yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss";
- const std::string yyyy_MM_dd_HH_mm_ss_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
- const std::string yyyyMMddHHmmss = "yyyyMMddHHmmss";
-
- class UtilAll
- {
- public:
- static pid_t getPid()
- {
- static __thread pid_t pid = 0;
- if (!pid || pid != getpid())
- {
- pid = getpid();
- }
- return pid;
- }
-
- static pid_t getTid()
- {
- static __thread pid_t pid = 0;
- static __thread pid_t tid = 0;
- if (!pid || !tid || pid != getpid())
- {
- pid = getpid();
- tid = syscall(__NR_gettid);
- }
- return tid;
- }
-
- static int Split(std::vector<std::string>& out, const std::string& in, const std::string& delimiter)
- {
- std::string::size_type left = 0;
- for (size_t i = 1; i < in.size(); i++)
- {
- std::string::size_type right = in.find(delimiter, left);
-
- if (right == std::string::npos)
- {
- break;
- }
-
- out.push_back(in.substr(left, right - left));
-
- left = right + delimiter.length();
- }
-
- out.push_back(in.substr(left));
-
- return out.size();
- }
-
- static int Split(std::vector<std::string>& out, const std::string& in, const char delimiter)
- {
- std::string::size_type left = 0;
- for (size_t i = 1; i < in.size(); i++)
- {
- std::string::size_type right = in.find(delimiter, left);
-
- if (right == std::string::npos)
- {
- break;
- }
-
- out.push_back(in.substr(left, right - left));
-
- left = right + 1;
- }
-
- out.push_back(in.substr(left));
-
- return out.size();
- }
-
- static std::string Trim(const std::string& str)
- {
- if (str.empty())
- {
- return str;
- }
-
- std::string::size_type left = str.find_first_not_of(WHITESPACE);
-
- if (left == std::string::npos)
- {
- return "";
- }
-
- std::string::size_type right = str.find_last_not_of(WHITESPACE);
-
- if (right == std::string::npos)
- {
- return str.substr(left);
- }
-
- return str.substr(left, right + 1 - left);
- }
-
- static bool isBlank(const std::string& str)
- {
- if (str.empty())
- {
- return true;
- }
-
- std::string::size_type left = str.find_first_not_of(WHITESPACE);
-
- if (left == std::string::npos)
- {
- return true;
- }
-
- return false;
- }
-
- static int availableProcessors()
- {
- return 4;
- }
-
-
- static int hashCode(const char* pData, int len)
- {
- int h = 0;
- if (pData != NULL && len > 0)
- {
- unsigned char c;
- for (int i = 0; i < len; i++)
- {
- c = (unsigned char)pData[i];
- h = 31 * h + c;
- }
- }
-
- return h;
- }
-
- static int hashCode(const std::string& s)
- {
- return hashCode(s.c_str(), s.length());
- }
-
- static int hashCode(const char* pData)
- {
- return hashCode(std::string(pData));
- }
-
- static int hashCode(char x)
- {
- return x;
- }
-
- static int hashCode(unsigned char x)
- {
- return x;
- }
-
- static int hashCode(short x)
- {
- return x;
- }
-
- static int hashCode(unsigned short x)
- {
- return x;
- }
-
- static int hashCode(int x)
- {
- return x;
- }
-
- static int hashCode(unsigned int x)
- {
- return x;
- }
-
- static int hashCode(long x)
- {
- return x;
- }
-
- static int hashCode(unsigned long x)
- {
- return x;
- }
-
- template <typename T>
- static int hashCode(const std::vector<T>& v)
- {
- int h = 0;
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- h += hashCode(*it);
- ++it;
- }
- return h;
- }
-
- template <typename T>
- static int hashCode(const std::set<T>& v)
- {
- int h = 0;
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- h += hashCode(*it);
- ++it;
- }
- return h;
- }
-
- static std::string toString(Json::Value& json)
- {
- Json::FastWriter fastWriter;
- return fastWriter.write(json);
- }
-
- template<typename T>
- static std::string toString(const T& v)
- {
- std::ostringstream ss;
- ss << v;
- return ss.str();
- }
-
- template<typename T>
- static std::string toString(const std::vector<T>& v)
- {
- std::string s;
- s.append("[");
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- s.append(toString(*it));
- s.append(",");
- ++it;
- }
- if (s.size() > 1)
- {
- s.erase(s.size() - 1, 1);
- }
- s.append("]");
- return s;
- }
-
-
- template <typename T>
- static std::string toString(const std::list<T>& v)
- {
- std::string s;
- s.append("[");
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- s.append(toString(*it));
- s.append(",");
- ++it;
- }
- if (s.size() > 1)
- {
- s.erase(s.size() - 1, 1);
- }
- s.append("]");
-
- return s;
- }
-
- template <typename T>
- static std::string toString(const std::set<T>& v)
- {
- std::string s;
- s.append("[");
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- s.append(toString(*it));
- s.append(",");
- ++it;
- }
- if (s.size() > 1)
- {
- s.erase(s.size() - 1, 1);
- }
- s.append("]");
- return s;
- }
-
- template<typename K, typename V, typename D, typename A>
- static std::string toString(const std::map<K, V, D, A>& v)
- {
- std::string s;
- s.append("{");
- typeof(v.begin()) it = v.begin();
- while (it != v.end())
- {
- s.append(toString(it->first));
- s.append("=");
- s.append(toString(it->second));
- s.append(",");
- ++it;
- }
- if (s.size() > 1)
- {
- s.erase(s.size() - 1, 1);
- }
- s.append("}");
- return s;
- }
-
- template<typename out_type, typename in_type>
- static out_type convert(const in_type& t)
- {
- out_type result;
- std::stringstream stream;
- stream << t;
- stream >> result;
- return result;
- }
-
- static bool compress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen, int level)
- {
- int ret, flush;
- int have;
- z_stream strm;
- unsigned char out[CHUNK];
-
- /* allocate deflate state */
- strm.zalloc = Z_NULL;
- strm.zfree = Z_NULL;
- strm.opaque = Z_NULL;
- ret = deflateInit(&strm, level);
- if (ret != Z_OK)
- {
- return false;
- }
-
- int outBufferLen = inLen;
- unsigned char* outData = (unsigned char*)malloc(outBufferLen);
- int left = inLen;
- int used = 0;
- int outDataLen = 0;
-
- /* compress until end of buffer */
- do
- {
- strm.avail_in = left > CHUNK ? CHUNK : left;
- flush = left <= CHUNK ? Z_FINISH : Z_NO_FLUSH;
- strm.next_in = (unsigned char*)pIn + used;
- used += strm.avail_in;
- left -= strm.avail_in;
-
- /* run deflate() on input until output buffer not full, finish
- compression if all of source has been read in */
- do
- {
- strm.avail_out = CHUNK;
- strm.next_out = out;
- ret = deflate(&strm, flush); /* no bad return value */
- assert(ret != Z_STREAM_ERROR); /* state not clobbered */
- have = CHUNK - strm.avail_out;
-
- if (outDataLen + have > outBufferLen)
- {
- outBufferLen = outDataLen + have;
- outBufferLen <<= 1;
- unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
- if (!tmp)
- {
- free(outData);
- return false;
- }
-
- outData = tmp;
- }
-
- memcpy(outData + outDataLen, out, have);
- outDataLen += have;
-
- }
- while (strm.avail_out == 0);
- assert(strm.avail_in == 0); /* all input will be used */
-
- /* done when last data in file processed */
- }
- while (flush != Z_FINISH);
- assert(ret == Z_STREAM_END); /* stream will be complete */
-
- *pOutLen = outDataLen;
- *pOut = outData;
-
- /* clean up and return */
- (void)deflateEnd(&strm);
- return true;
- }
-
- static bool decompress(const char* pIn, int inLen, unsigned char** pOut, int* pOutLen)
- {
- int ret;
- int have;
- z_stream strm;
-
- unsigned char out[CHUNK];
-
- /* allocate inflate state */
- strm.zalloc = Z_NULL;
- strm.zfree = Z_NULL;
- strm.opaque = Z_NULL;
- strm.avail_in = 0;
- strm.next_in = Z_NULL;
- ret = inflateInit(&strm);
- if (ret != Z_OK)
- {
- return false;
- }
-
- int outBufferLen = inLen << 2;
- unsigned char* outData = (unsigned char*)malloc(outBufferLen);
-
- int left = inLen;
- int used = 0;
- int outDataLen = 0;
-
- /* decompress until deflate stream ends or end of buffer */
- do
- {
- strm.avail_in = left > CHUNK ? CHUNK : left;
- if (strm.avail_in <= 0)
- {
- break;
- }
-
- strm.next_in = (unsigned char*)pIn + used;
- used += strm.avail_in;
- left -= strm.avail_in;
-
- /* run inflate() on input until output buffer not full */
- do
- {
- strm.avail_out = CHUNK;
- strm.next_out = out;
- ret = inflate(&strm, Z_NO_FLUSH);
- assert(ret != Z_STREAM_ERROR); /* state not clobbered */
- switch (ret)
- {
- case Z_NEED_DICT:
- ret = Z_DATA_ERROR; /* and fall through */
- case Z_DATA_ERROR:
- case Z_MEM_ERROR:
- (void)inflateEnd(&strm);
- free(outData);
- return false;
- }
- have = CHUNK - strm.avail_out;
-
- if (outDataLen + have > outBufferLen)
- {
- outBufferLen = outDataLen + have;
- outBufferLen <<= 1;
- unsigned char* tmp = (unsigned char*)realloc(outData, outBufferLen);
- if (!tmp)
- {
- free(outData);
- return false;
- }
-
- outData = tmp;
- }
-
- memcpy(outData + outDataLen, out, have);
- outDataLen += have;
-
- }
- while (strm.avail_out == 0);
-
- /* done when inflate() says it's done */
- }
- while (ret != Z_STREAM_END);
-
- /* clean up and return */
- (void)inflateEnd(&strm);
-
- if (ret == Z_STREAM_END)
- {
- *pOutLen = outDataLen;
- *pOut = outData;
-
- return true;
- }
- else
- {
- free(outData);
-
- return false;
- }
- }
-
- static unsigned long long hexstr2ull(const char* str)
- {
- char* end;
- return strtoull(str, &end, 16);
- }
-
- static long long str2ll(const char *str)
- {
- return atoll(str);
- }
-
-
- static std::string tm2str(const time_t& t, const std::string& sFormat)
- {
- struct tm stTm;
- localtime_r(&t, &stTm);
-
- char sTimeString[255] = "\0";
- strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm);
-
- return std::string(sTimeString);
- }
-
- static std::string now2str(const std::string& sFormat)
- {
- time_t t = time(NULL);
- return tm2str(t, sFormat.c_str());
- }
-
- static std::string now2str()
- {
- return now2str("%Y-%m-%d %H:%M:%S");
- }
-
- static int64_t now2ms()
- {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
- }
-
- static int64_t now2us()
- {
- struct timeval tv;
- gettimeofday(&tv, NULL);
- return tv.tv_sec * (int64_t)1000*1000 + tv.tv_usec;
- }
-
- static int str2tm(const std::string &sString, const std::string &sFormat, struct tm &stTm)
- {
- char *p = strptime(sString.c_str(), sFormat.c_str(), &stTm);
- return (p != NULL) ? 0 : -1;
- }
-
- static time_t str2tm(const std::string &sString, const std::string &sFormat)
- {
- struct tm stTm;
- if (str2tm(sString, sFormat, stTm) == 0)
- {
- time_t t = mktime(&stTm);
- return t;
- }
- else
- {
- return -1;
- }
- }
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/Validators.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.cpp b/rocketmq-client4cpp/src/common/Validators.cpp
deleted file mode 100755
index 29f36a0..0000000
--- a/rocketmq-client4cpp/src/common/Validators.cpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "Validators.h"
-
-#include <stdlib.h>
-#include <stdio.h>
-#include "MQClientException.h"
-#include "UtilAll.h"
-#include "MixAll.h"
-#include "Message.h"
-#include "MQProtos.h"
-#include "DefaultMQProducer.h"
-
-namespace rmq
-{
-
-const std::string Validators::validPatternStr = "^[a-zA-Z0-9_-]+$";
-const size_t Validators::CHARACTER_MAX_LENGTH = 255;
-
-bool Validators::regularExpressionMatcher(const std::string& origin, const std::string& patternStr)
-{
- if (UtilAll::isBlank(origin))
- {
- return false;
- }
-
- if (UtilAll::isBlank(patternStr))
- {
- return true;
- }
-
- //Pattern pattern = Pattern.compile(patternStr);
- //Matcher matcher = pattern.matcher(origin);
-
- //return matcher.matches();
- return true;
-}
-
-std::string Validators::getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr)
-{
- /*Pattern pattern = Pattern.compile(patternStr);
- Matcher matcher = pattern.matcher(origin);
- while (matcher.find()) {
- return matcher.group(0);
- }*/
- return "";
-}
-
-void Validators::checkTopic(const std::string& topic)
-{
- if (UtilAll::isBlank(topic))
- {
- THROW_MQEXCEPTION(MQClientException, "the specified topic is blank", -1);
- }
-
- if (topic.length() > CHARACTER_MAX_LENGTH)
- {
- THROW_MQEXCEPTION(MQClientException, "the specified topic is longer than topic max length 255.", -1);
- }
-
- // Topic�����Ƿ��뱣���ֶγ�ͻ
- if (topic == MixAll::DEFAULT_TOPIC)
- {
- THROW_MQEXCEPTION(MQClientException, "the topic[" + topic + "] is conflict with default topic.", -1);
- }
-
- if (!regularExpressionMatcher(topic, validPatternStr))
- {
- std::string str;
- str = "the specified topic[" + topic + "] contains illegal characters, allowing only" + validPatternStr;
-
- THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
- }
-}
-
-void Validators::checkGroup(const std::string& group)
-{
- if (UtilAll::isBlank(group))
- {
- THROW_MQEXCEPTION(MQClientException, "the specified group is blank", -1);
- }
-
- if (!regularExpressionMatcher(group, validPatternStr))
- {
- std::string str;
- str = "the specified group[" + group + "] contains illegal characters, allowing only" + validPatternStr;
-
- THROW_MQEXCEPTION(MQClientException, str.c_str(), -1);
- }
- if (group.length() > CHARACTER_MAX_LENGTH)
- {
- THROW_MQEXCEPTION(MQClientException, "the specified group is longer than group max length 255.", -1);
- }
-}
-
-void Validators::checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer)
-{
- checkTopic(msg.getTopic());
-
- //// body
- if (msg.getBody() == NULL)
- {
- THROW_MQEXCEPTION(MQClientException, "the message body is null", MESSAGE_ILLEGAL_VALUE);
- }
-
- if (msg.getBodyLen() == 0)
- {
- THROW_MQEXCEPTION(MQClientException, "the message body length is zero", MESSAGE_ILLEGAL_VALUE);
- }
-
- if (msg.getBodyLen() > pDefaultMQProducer->getMaxMessageSize())
- {
- char info[256];
- snprintf(info, sizeof(info), "the message body size over max value, MAX: %d", pDefaultMQProducer->getMaxMessageSize());
- THROW_MQEXCEPTION(MQClientException, info, MESSAGE_ILLEGAL_VALUE);
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/Validators.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/Validators.h b/rocketmq-client4cpp/src/common/Validators.h
deleted file mode 100755
index 36ab299..0000000
--- a/rocketmq-client4cpp/src/common/Validators.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __VALIDATORST_H__
-#define __VALIDATORST_H__
-
-#include <string>
-
-namespace rmq
-{
- class MQClientException;
- class DefaultMQProducer;
- class Message;
-
- /**
- * Validator class
- *
- * @author manhong.yqd<jo...@gmail.com>
- * @since 2013-8-28
- */
- class Validators
- {
- public:
- static bool regularExpressionMatcher(const std::string& origin, const std::string& patternStr);
- static std::string getGroupWithRegularExpression(const std::string& origin, const std::string& patternStr);
-
- static void checkTopic(const std::string& topic);
- static void checkGroup(const std::string& group);
- static void checkMessage(const Message& msg, DefaultMQProducer* pDefaultMQProducer);
-
- public:
- static const std::string validPatternStr;
- static const size_t CHARACTER_MAX_LENGTH;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp b/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
deleted file mode 100755
index c68bfc8..0000000
--- a/rocketmq-client4cpp/src/common/VirtualEnvUtil.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "VirtualEnvUtil.h"
-
-#include <stdlib.h>
-#include <stdio.h>
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-const char* VirtualEnvUtil::VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%";
-
-std::string VirtualEnvUtil::buildWithProjectGroup(const std::string& origin, const std::string& projectGroup)
-{
- if (!UtilAll::isBlank(projectGroup))
- {
- char prefix[1024];
- snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
-
- if (origin.find_last_of(prefix) == std::string::npos)
- {
- return origin + prefix;
- }
- else
- {
- return origin;
- }
- }
- else
- {
- return origin;
- }
-}
-
-
-std::string VirtualEnvUtil::clearProjectGroup(const std::string& origin, const std::string& projectGroup)
-{
- char prefix[1024];
- snprintf(prefix, sizeof(prefix), VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
- std::string::size_type pos = origin.find_last_of(prefix);
-
- if (!UtilAll::isBlank(prefix) && pos != std::string::npos)
- {
- return origin.substr(0, pos);
- }
- else
- {
- return origin;
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h b/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
deleted file mode 100755
index 10ca0cd..0000000
--- a/rocketmq-client4cpp/src/common/VirtualEnvUtil.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang, kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __VIRTUALENVUTIL_H__
-#define __VIRTUALENVUTIL_H__
-
-#include <string>
-
-namespace rmq
-{
- /**
- * VirtualEnv API
- *
- * @author manhong.yqd<jo...@gmail.com>
- * @since 2013-8-26
- */
- class VirtualEnvUtil
- {
- public:
- static std::string buildWithProjectGroup(const std::string& origin, const std::string& projectGroup);
- static std::string clearProjectGroup(const std::string& origin, const std::string& projectGroup);
-
- public:
- static const char* VIRTUAL_APPGROUP_PREFIX;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h b/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
deleted file mode 100755
index 49e1e7c..0000000
--- a/rocketmq-client4cpp/src/consumer/AllocateMessageQueueStrategyInner.h
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
-#define __ALLOCATEMESSAGEQUEUESTRATEGYINNER_H__
-
-#include <algorithm>
-
-#include "AllocateMessageQueueStrategy.h"
-#include "MQClientException.h"
-#include "UtilAll.h"
-
-
-namespace rmq
-{
-
- class AllocateMessageQueueAveragely : public AllocateMessageQueueStrategy
- {
- public:
- virtual ~AllocateMessageQueueAveragely() {}
- virtual std::vector<MessageQueue>* allocate(
- const std::string& consumerGroup,
- const std::string& currentCID,
- std::vector<MessageQueue>& mqAll,
- std::list<std::string>& cidAll)
- {
- if (currentCID.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
- }
-
- if (mqAll.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
- }
-
- if (cidAll.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
- }
-
- int index = -1;
- int cidAllSize = cidAll.size();
-
- std::list<std::string>::iterator it = cidAll.begin();
- for (int i = 0; it != cidAll.end(); it++, i++)
- {
- if (*it == currentCID)
- {
- index = i;
- break;
- }
- }
-
- if (index == -1)
- {
- RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
- consumerGroup.c_str(),
- currentCID.c_str(),
- UtilAll::toString(cidAll).c_str());
- return NULL;
- }
-
- int mqAllSize = mqAll.size();
- int mod = mqAllSize % cidAllSize;
- int averageSize =
- mqAllSize <= cidAllSize ? 1 : (mod > 0 && index < mod ? mqAllSize / cidAllSize
- + 1 : mqAllSize / cidAllSize);
- int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
-
- std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
- int range = std::min<int>(averageSize, mqAllSize - startIndex);
-
- for (int i = 0; i < range; i++)
- {
- result->push_back(mqAll.at((startIndex + i) % mqAllSize));
- }
-
- return result;
- }
-
- virtual std::string getName()
- {
- return "AVG";
- }
- };
-
-
- class AllocateMessageQueueAveragelyByCircle : public AllocateMessageQueueStrategy
- {
- public:
- virtual ~AllocateMessageQueueAveragelyByCircle() {}
- virtual std::vector<MessageQueue>* allocate(
- const std::string& consumerGroup,
- const std::string& currentCID,
- std::vector<MessageQueue>& mqAll,
- std::list<std::string>& cidAll)
- {
- if (currentCID.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "currentCID is empty", -1);
- }
-
- if (mqAll.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "mqAll is empty", -1);
- }
-
- if (cidAll.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "cidAll is empty", -1);
- }
-
- int index = -1;
- std::list<std::string>::iterator it = cidAll.begin();
- for (int i = 0; it != cidAll.end(); it++, i++)
- {
- if (*it == currentCID)
- {
- index = i;
- break;
- }
- }
-
- if (index == -1)
- {
- RMQ_ERROR("[BUG] ConsumerGroup: {%s} The consumerId: {%s} not in cidAll: {%s}", //
- consumerGroup.c_str(),
- currentCID.c_str(),
- UtilAll::toString(cidAll).c_str());
- return NULL;
- }
-
- std::vector<MessageQueue>* result = new std::vector<MessageQueue>();
- for (int i = index; i < (int)mqAll.size(); i++)
- {
- if (i % (int)cidAll.size() == index)
- {
- result->push_back(mqAll.at(i));
- }
- }
-
- return result;
- }
-
- virtual std::string getName()
- {
- return "AVG_BY_CIRCLE";
- }
- };
-
-
- class AllocateMessageQueueByConfig : public AllocateMessageQueueStrategy
- {
- public:
- virtual ~AllocateMessageQueueByConfig() {}
- virtual std::vector<MessageQueue>* allocate(
- const std::string& consumerGroup,
- const std::string& currentCID,
- std::vector<MessageQueue>& mqAll,
- std::list<std::string>& cidAll)
- {
- return NULL;
- }
-
- virtual std::string getName()
- {
- return "CONFIG";
- }
- };
-
-
- class AllocateMessageQueueByMachineRoom : public AllocateMessageQueueStrategy
- {
- public:
- virtual ~AllocateMessageQueueByMachineRoom() {}
- virtual std::vector<MessageQueue>* allocate(
- const std::string& consumerGroup,
- const std::string& currentCID,
- std::vector<MessageQueue>& mqAll,
- std::list<std::string>& cidAll)
- {
- return NULL;
- }
-
- virtual std::string getName()
- {
- return "MACHINE_ROOM";
- }
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
deleted file mode 100755
index 7550acb..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ /dev/null
@@ -1,476 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ConsumeMessageConcurrentlyService.h"
-
-#include "DefaultMQPushConsumerImpl.h"
-#include "MessageListener.h"
-#include "MessageQueue.h"
-#include "RebalanceImpl.h"
-#include "DefaultMQPushConsumer.h"
-#include "MixAll.h"
-#include "KPRUtil.h"
-#include "UtilAll.h"
-#include "OffsetStore.h"
-
-namespace rmq
-{
-
-
-class SubmitConsumeRequestLater : public kpr::TimerHandler
-{
-public:
- SubmitConsumeRequestLater(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- MessageQueue messageQueue,
- ConsumeMessageConcurrentlyService* pService)
- : m_msgs(msgs),
- m_pProcessQueue(pProcessQueue),
- m_messageQueue(messageQueue),
- m_pService(pService)
- {
-
- }
-
- void OnTimeOut(unsigned int timerID)
- {
- try
- {
- m_pService->submitConsumeRequest(m_msgs, m_pProcessQueue, m_messageQueue, true);
- }
- catch(...)
- {
- RMQ_ERROR("SubmitConsumeRequestLater OnTimeOut exception");
- }
-
- delete this;
- }
-
-private:
- std::list<MessageExt*> m_msgs;
- ProcessQueue* m_pProcessQueue;
- MessageQueue m_messageQueue;
- ConsumeMessageConcurrentlyService* m_pService;
-};
-
-
-class CleanExpireMsgTask : public kpr::TimerHandler
-{
-public:
- CleanExpireMsgTask(ConsumeMessageConcurrentlyService* pService)
- : m_pService(pService)
- {
-
- }
-
- void OnTimeOut(unsigned int timerID)
- {
- try
- {
- m_pService->cleanExpireMsg();
- }
- catch(...)
- {
- RMQ_ERROR("CleanExpireMsgTask OnTimeOut exception");
- }
- }
-
-private:
- ConsumeMessageConcurrentlyService* m_pService;
-};
-
-
-
-ConsumeMessageConcurrentlyService::ConsumeMessageConcurrentlyService(
- DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
- MessageListenerConcurrently* pMessageListener)
-{
- m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
- m_pMessageListener = pMessageListener;
- m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
- m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
- m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 5,
- m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
- m_pScheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 1000);
- m_pCleanExpireMsgExecutors = new kpr::TimerThread("CleanExpireMsgService", 1000);
- m_pCleanExpireMsgTask = new CleanExpireMsgTask(this);
-}
-
-ConsumeMessageConcurrentlyService::~ConsumeMessageConcurrentlyService()
-{
- delete m_pCleanExpireMsgTask;
-}
-
-
-void ConsumeMessageConcurrentlyService::start()
-{
- m_pCleanExpireMsgExecutors->RegisterTimer(60 * 1000, 60 * 1000, m_pCleanExpireMsgTask, true);
- m_pScheduledExecutorService->Start();
- m_pCleanExpireMsgExecutors->Start();
-}
-
-void ConsumeMessageConcurrentlyService::shutdown()
-{
- m_pConsumeExecutor->Destroy();
- m_pScheduledExecutorService->Stop();
- m_pScheduledExecutorService->Join();
-
- m_pCleanExpireMsgExecutors->Stop();
- m_pCleanExpireMsgExecutors->Join();
-}
-
-
-void ConsumeMessageConcurrentlyService::cleanExpireMsg()
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTableLock());
- std::map<MessageQueue, ProcessQueue*>& processQueueTable
- = m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->getProcessQueueTable();
- RMQ_FOR_EACH(processQueueTable, it)
- {
- ProcessQueue* pq = it->second;
- if (!pq->isDropped())
- {
- pq->cleanExpiredMsg(m_pDefaultMQPushConsumer);
- }
- }
-}
-
-
-ConsumerStat& ConsumeMessageConcurrentlyService::getConsumerStat()
-{
- return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
-}
-
-bool ConsumeMessageConcurrentlyService::sendMessageBack(MessageExt& msg,
- ConsumeConcurrentlyContext& context)
-{
- try
- {
- m_pDefaultMQPushConsumerImpl->sendMessageBack(msg,
- context.delayLevelWhenNextConsume, context.messageQueue.getBrokerName());
- return true;
- }
- catch (...)
- {
- RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
- m_consumerGroup.c_str(), msg.toString().c_str());
- }
-
- return false;
-}
-
-void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue)
-{
- SubmitConsumeRequestLater* sc = new SubmitConsumeRequestLater(msgs, pProcessQueue, messageQueue, this);
- m_pScheduledExecutorService->RegisterTimer(0, 5000, sc, false);
-}
-
-void ConsumeMessageConcurrentlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- bool dispathToConsume)
-{
- size_t consumeBatchSize = m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize();
-
- RMQ_DEBUG("submitConsumeRequest begin, msgs.size=%d, messageQueue=%s, consumeBatchSize=%d, dispathToConsume=%d",
- (int)msgs.size(), messageQueue.toString().c_str(), (int)consumeBatchSize, dispathToConsume
- );
-
- if (msgs.size() <= consumeBatchSize)
- {
- kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgs, pProcessQueue, messageQueue, this);
- m_pConsumeExecutor->AddWork(consumeRequest);
- }
- else
- {
- std::list<MessageExt*>::iterator it = msgs.begin();
- for (; it != msgs.end();)
- {
- std::list<MessageExt*> msgThis;
- for (size_t i = 0; i < consumeBatchSize; i++, it++)
- {
- if (it != msgs.end())
- {
- msgThis.push_back(*it);
- }
- else
- {
- break;
- }
- }
-
- kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeConcurrentlyRequest(msgThis, pProcessQueue, messageQueue, this);
- m_pConsumeExecutor->AddWork(consumeRequest);
- }
- }
-
- RMQ_DEBUG("submitConsumeRequest end");
-}
-
-void ConsumeMessageConcurrentlyService::updateCorePoolSize(int corePoolSize)
-{
- //todo
-}
-
-void ConsumeMessageConcurrentlyService::processConsumeResult(ConsumeConcurrentlyStatus status,
- ConsumeConcurrentlyContext& context,
- ConsumeConcurrentlyRequest& consumeRequest)
-{
- int ackIndex = context.ackIndex;
-
- if (consumeRequest.getMsgs().empty())
- {
- return;
- }
-
- int msgsSize = consumeRequest.getMsgs().size();
-
- switch (status)
- {
- case CONSUME_SUCCESS:
- {
- if (ackIndex >= msgsSize)
- {
- ackIndex = msgsSize - 1;
- }
-
- int ok = ackIndex + 1;
- int failed = msgsSize - ok;
- getConsumerStat().consumeMsgOKTotal.fetchAndAdd(ok);
- getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(failed);
- }
-
- break;
- case RECONSUME_LATER:
- ackIndex = -1;
- getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
- break;
- default:
- break;
- }
-
- std::list<MessageExt*>& msgs = consumeRequest.getMsgs();
- std::list<MessageExt*>::iterator it = msgs.begin();
-
- for (int i = 0; i < ackIndex + 1 && it != msgs.end(); i++)
- {
- it++;
- }
-
- switch (m_pDefaultMQPushConsumer->getMessageModel())
- {
- case BROADCASTING:
- for (; it != msgs.end(); it++)
- {
- MessageExt* msg = *it;
- RMQ_WARN("BROADCASTING, the message consume failed, drop it, %s", msg->toString().c_str());
- }
- break;
- case CLUSTERING:
- {
- std::list<MessageExt*> msgBackFailed;
- for (; it != msgs.end(); it++)
- {
- MessageExt* msg = *it;
- bool result = sendMessageBack(*msg, context);
- if (!result)
- {
- msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
- msgBackFailed.push_back(msg);
- }
- }
-
- if (!msgBackFailed.empty())
- {
- it = msgs.begin();
-
- for (; it != msgs.end();)
- {
- bool find = false;
- std::list<MessageExt*>::iterator itFailed = msgBackFailed.begin();
- for (; itFailed != msgBackFailed.end(); itFailed++)
- {
- if (*it == *itFailed)
- {
- it = msgs.erase(it);
- find = true;
- break;
- }
- }
-
- if (!find)
- {
- it++;
- }
- }
-
- submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(),
- consumeRequest.getMessageQueue());
- }
- }
- break;
- default:
- break;
- }
-
- long long offset = consumeRequest.getProcessQueue()->removeMessage(consumeRequest.getMsgs());
- if (offset >= 0 && !(consumeRequest.getProcessQueue()->isDropped()))
- {
- m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
- offset, true);
- }
-}
-
-std::string& ConsumeMessageConcurrentlyService::getConsumerGroup()
-{
- return m_consumerGroup;
-}
-
-MessageListenerConcurrently* ConsumeMessageConcurrentlyService::getMessageListener()
-{
- return m_pMessageListener;
-}
-
-DefaultMQPushConsumerImpl* ConsumeMessageConcurrentlyService::getDefaultMQPushConsumerImpl()
-{
- return m_pDefaultMQPushConsumerImpl;
-}
-
-ConsumeConcurrentlyRequest::ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- ConsumeMessageConcurrentlyService* pService)
-{
- m_msgs = msgs;
- m_pProcessQueue = pProcessQueue;
- m_pService = pService;
- m_messageQueue = messageQueue;
-}
-
-ConsumeConcurrentlyRequest::~ConsumeConcurrentlyRequest()
-{
- m_msgs.clear();
-}
-
-void ConsumeConcurrentlyRequest::Do()
-{
- RMQ_DEBUG("consumeMessage begin, m_msgs.size=%d", (int)m_msgs.size());
-
- if (m_pProcessQueue->isDropped())
- {
- RMQ_WARN("the message queue not be able to consume, because it's droped, {%s}",
- m_messageQueue.toString().c_str());
- return;
- }
-
- try
- {
- MessageListenerConcurrently* listener = m_pService->getMessageListener();
- ConsumeConcurrentlyContext context(m_messageQueue);
- ConsumeConcurrentlyStatus status = RECONSUME_LATER;
-
- ConsumeMessageContext consumeMessageContext;
- if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
- {
- consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
- consumeMessageContext.mq = m_messageQueue;
- consumeMessageContext.msgList = m_msgs;
- consumeMessageContext.success = false;
- m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
- }
-
- long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
- try
- {
- resetRetryTopic(m_msgs);
- if (!m_msgs.empty())
- {
- std::list<MessageExt*>::iterator it = m_msgs.begin();
- for (; it != m_msgs.end(); it++)
- {
- MessageExt* msg = (*it);
- msg->putProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP,
- UtilAll::toString(KPRUtil::GetCurrentTimeMillis()));
- }
- }
- status = listener->consumeMessage(m_msgs, context);
- }
- catch (...)
- {
- RMQ_WARN("consumeMessage exception, Group: {%s} Msgs: {%d} MQ: {%s}",
- m_pService->getConsumerGroup().c_str(),
- (int)m_msgs.size(),
- m_messageQueue.toString().c_str()
- );
- }
-
- long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
-
- if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
- {
- consumeMessageContext.success = (status == CONSUME_SUCCESS);
- m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
- }
-
- m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
- bool updated = MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat().consumeMsgRTMax, consumeRT);
- if (updated)
- {
- RMQ_WARN("consumeMessage RT new max: %lld, Group: %s, Msgs: %d, MQ: %s",
- consumeRT,
- m_pService->getConsumerGroup().c_str(),
- (int)m_msgs.size(),
- m_messageQueue.toString().c_str()
- );
- }
-
- if (!m_pProcessQueue->isDropped())
- {
- m_pService->processConsumeResult(status, context, *this);
- }
- else
- {
- RMQ_WARN("processQueue is dropped without process consume result, messageQueue={%s}, msgs.size={%d}",
- m_messageQueue.toString().c_str(), (int)m_msgs.size());
- }
- }
- catch(...)
- {
- RMQ_WARN("ConsumeConcurrentlyRequest exception");
- }
- RMQ_DEBUG("consumeMessage end, m_msgs.size=%d", (int)m_msgs.size());
-
- return;
-}
-
-void ConsumeConcurrentlyRequest::resetRetryTopic(std::list<MessageExt*>& msgs)
-{
- std::string groupTopic = MixAll::getRetryTopic(m_pService->getConsumerGroup());
- std::list<MessageExt*>::iterator it = msgs.begin();
-
- for (; it != msgs.end(); it++)
- {
- MessageExt* msg = (*it);
- std::string retryTopic = msg->getProperty(Message::PROPERTY_RETRY_TOPIC);
- if (!retryTopic.empty() && groupTopic == msg->getTopic())
- {
- msg->setTopic(retryTopic);
- }
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
deleted file mode 100755
index acb7538..0000000
--- a/rocketmq-client4cpp/src/consumer/ConsumeMessageConcurrentlyService.h
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
-#define __CONSUMEMESSAGECONCURRENTLYSERVICE_H__
-
-#include "ConsumeMessageService.h"
-
-#include <list>
-#include <string>
-#include "MessageQueueLock.h"
-#include "ConsumerStatManage.h"
-#include "MessageExt.h"
-#include "MessageListener.h"
-#include "ProcessQueue.h"
-#include "ThreadPool.h"
-#include "TimerThread.h"
-
-namespace rmq
-{
- class DefaultMQPushConsumerImpl;
- class DefaultMQPushConsumer;
- class MessageListenerConcurrently;
- class ConsumeMessageConcurrentlyService;
-
- class ConsumeConcurrentlyRequest: public kpr::ThreadPoolWork
- {
- public:
- ConsumeConcurrentlyRequest(std::list<MessageExt*>& msgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- ConsumeMessageConcurrentlyService* pService);
- ~ConsumeConcurrentlyRequest();
- virtual void Do();
-
- std::list<MessageExt*>& getMsgs()
- {
- return m_msgs;
- }
-
- ProcessQueue* getProcessQueue()
- {
- return m_pProcessQueue;
- }
-
- MessageQueue getMessageQueue()
- {
- return m_messageQueue;
- }
-
- private:
- void resetRetryTopic(std::list<MessageExt*>& msgs);
-
- private:
- std::list<MessageExt*> m_msgs;
- ProcessQueue* m_pProcessQueue;
- MessageQueue m_messageQueue;
- ConsumeMessageConcurrentlyService* m_pService;
- };
-
-
- class ConsumeMessageConcurrentlyService : public ConsumeMessageService
- {
- public:
- ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
- MessageListenerConcurrently* pMessageListener);
- ~ConsumeMessageConcurrentlyService();
-
- void start();
- void shutdown();
-
- void cleanExpireMsg();
- ConsumerStat& getConsumerStat();
-
- bool sendMessageBack(MessageExt& msg, ConsumeConcurrentlyContext& context);
- void processConsumeResult(ConsumeConcurrentlyStatus status,
- ConsumeConcurrentlyContext& context,
- ConsumeConcurrentlyRequest& consumeRequest);
-
- void submitConsumeRequestLater(std::list<MessageExt*>& pMsgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue);
-
- void submitConsumeRequest(std::list<MessageExt*>& pMsgs,
- ProcessQueue* pProcessQueue,
- const MessageQueue& messageQueue,
- bool dispathToConsume);
-
- void updateCorePoolSize(int corePoolSize);
-
- std::string& getConsumerGroup();
- MessageListenerConcurrently* getMessageListener();
- DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl();
-
- private:
- DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
- DefaultMQPushConsumer* m_pDefaultMQPushConsumer;
- MessageListenerConcurrently* m_pMessageListener;
- std::string m_consumerGroup;
- kpr::ThreadPoolPtr m_pConsumeExecutor;
- kpr::TimerThreadPtr m_pScheduledExecutorService;
- kpr::TimerThreadPtr m_pCleanExpireMsgExecutors;
- kpr::TimerHandler* m_pCleanExpireMsgTask;
- };
-}
-
-#endif