You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:50 UTC
[04/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
new file mode 100755
index 0000000..9c53930
--- /dev/null
+++ b/rocketmq-cpp/src/producer/DefaultMQProducer.cpp
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultMQProducer.h"
+#include <assert.h>
+#include "CommandHeader.h"
+#include "CommunicationMode.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "MQDecoder.h"
+#include "MQProtos.h"
+#include "MessageSysFlag.h"
+#include "TopicPublishInfo.h"
+#include "Validators.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+DefaultMQProducer::DefaultMQProducer(const string& groupname)
+ : m_sendMsgTimeout(3000),
+ m_compressMsgBodyOverHowmuch(4 * 1024),
+ m_maxMessageSize(1024 * 128),
+ m_retryAnotherBrokerWhenNotStoreOK(false),
+ m_compressLevel(5),
+ m_retryTimes(5) {
+ //<!set default group name;
+ string gname = groupname.empty() ? DEFAULT_PRODUCER_GROUP : groupname;
+ setGroupName(gname);
+}
+
+DefaultMQProducer::~DefaultMQProducer() {}
+
+void DefaultMQProducer::start() {
+ /* Ignore the SIGPIPE */
+ struct sigaction sa;
+ sa.sa_handler = SIG_IGN;
+ sa.sa_flags = 0;
+ sigaction(SIGPIPE, &sa, 0);
+
+ switch (m_serviceState) {
+ case CREATE_JUST: {
+ m_serviceState = START_FAILED;
+ MQClient::start();
+ LOG_INFO("DefaultMQProducer:%s start", m_GroupName.c_str());
+
+ bool registerOK = getFactory()->registerProducer(this);
+ if (!registerOK) {
+ m_serviceState = CREATE_JUST;
+ THROW_MQEXCEPTION(
+ MQClientException,
+ "The producer group[" + getGroupName() +
+ "] has been created before, specify another name please.",
+ -1);
+ }
+
+ getFactory()->start();
+ getFactory()->sendHeartbeatToAllBroker();
+ m_serviceState = RUNNING;
+ break;
+ }
+ case RUNNING:
+ case START_FAILED:
+ case SHUTDOWN_ALREADY:
+ break;
+ default:
+ break;
+ }
+}
+
+void DefaultMQProducer::shutdown() {
+ switch (m_serviceState) {
+ case RUNNING: {
+ LOG_INFO("DefaultMQProducer shutdown");
+ getFactory()->unregisterProducer(this);
+ getFactory()->shutdown();
+ m_serviceState = SHUTDOWN_ALREADY;
+ break;
+ }
+ case SHUTDOWN_ALREADY:
+ case CREATE_JUST:
+ break;
+ default:
+ break;
+ }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ try {
+ return sendDefaultImpl(msg, ComMode_SYNC, NULL, bSelectActiveBroker);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback,
+ bool bSelectActiveBroker) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ try {
+ sendDefaultImpl(msg, ComMode_ASYNC, pSendCallback, bSelectActiveBroker);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ if (msg.getTopic() != mq.getTopic()) {
+ LOG_WARN("message's topic not equal mq's topic");
+ }
+ try {
+ return sendKernelImpl(msg, mq, ComMode_SYNC, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq,
+ SendCallback* pSendCallback) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ if (msg.getTopic() != mq.getTopic()) {
+ LOG_WARN("message's topic not equal mq's topic");
+ }
+ try {
+ sendKernelImpl(msg, mq, ComMode_ASYNC, pSendCallback);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ try {
+ sendDefaultImpl(msg, ComMode_ONEWAY, NULL, bSelectActiveBroker);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue& mq) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+ if (msg.getTopic() != mq.getTopic()) {
+ LOG_WARN("message's topic not equal mq's topic");
+ }
+ try {
+ sendKernelImpl(msg, mq, ComMode_ONEWAY, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg,
+ MessageQueueSelector* pSelector, void* arg) {
+ try {
+ return sendSelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return SendResult();
+}
+
+SendResult DefaultMQProducer::send(MQMessage& msg,
+ MessageQueueSelector* pSelector, void* arg,
+ int autoRetryTimes, bool bActiveBroker) {
+ try {
+ return sendAutoRetrySelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL,
+ autoRetryTimes, bActiveBroker);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+ return SendResult();
+}
+
+void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector,
+ void* arg, SendCallback* pSendCallback) {
+ try {
+ sendSelectImpl(msg, pSelector, arg, ComMode_ASYNC, pSendCallback);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+void DefaultMQProducer::sendOneway(MQMessage& msg,
+ MessageQueueSelector* pSelector, void* arg) {
+ try {
+ sendSelectImpl(msg, pSelector, arg, ComMode_ONEWAY, NULL);
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ throw e;
+ }
+}
+
+int DefaultMQProducer::getSendMsgTimeout() const { return m_sendMsgTimeout; }
+
+void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout) {
+ m_sendMsgTimeout = sendMsgTimeout;
+}
+
+int DefaultMQProducer::getCompressMsgBodyOverHowmuch() const {
+ return m_compressMsgBodyOverHowmuch;
+}
+
+void DefaultMQProducer::setCompressMsgBodyOverHowmuch(
+ int compressMsgBodyOverHowmuch) {
+ m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+}
+
+int DefaultMQProducer::getMaxMessageSize() const { return m_maxMessageSize; }
+
+void DefaultMQProducer::setMaxMessageSize(int maxMessageSize) {
+ m_maxMessageSize = maxMessageSize;
+}
+
+int DefaultMQProducer::getCompressLevel() const { return m_compressLevel; }
+
+void DefaultMQProducer::setCompressLevel(int compressLevel) {
+ assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1);
+
+ m_compressLevel = compressLevel;
+}
+
+//<!************************************************************************
+SendResult DefaultMQProducer::sendDefaultImpl(MQMessage& msg,
+ int communicationMode,
+ SendCallback* pSendCallback,
+ bool bActiveMQ) {
+ MQMessageQueue lastmq;
+ int mq_index = 0;
+ for (int times = 1; times <= m_retryTimes; times++) {
+ boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+ getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+ getSessionCredentials()));
+ boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+ weak_topicPublishInfo.lock());
+ if (topicPublishInfo) {
+ if (times == 1) {
+ mq_index = topicPublishInfo->getWhichQueue();
+ } else {
+ mq_index++;
+ }
+
+ SendResult sendResult;
+ MQMessageQueue mq;
+ if (bActiveMQ)
+ mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
+ else
+ mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
+
+ lastmq = mq;
+ if (mq.getQueueId() == -1) {
+ // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
+ // invalide", -1);
+ continue;
+ }
+
+ try {
+ LOG_DEBUG("send to brokerName:%s", mq.getBrokerName().c_str());
+ sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
+ switch (communicationMode) {
+ case ComMode_ASYNC:
+ return sendResult;
+ case ComMode_ONEWAY:
+ return sendResult;
+ case ComMode_SYNC:
+ if (sendResult.getSendStatus() != SEND_OK) {
+ if (bActiveMQ) {
+ topicPublishInfo->updateNonServiceMessageQueue(
+ mq, getSendMsgTimeout());
+ }
+ continue;
+ }
+ return sendResult;
+ default:
+ break;
+ }
+ } catch (...) {
+ LOG_ERROR("send failed of times:%d,brokerName:%s", times,
+ mq.getBrokerName().c_str());
+ if (bActiveMQ) {
+ topicPublishInfo->updateNonServiceMessageQueue(mq,
+ getSendMsgTimeout());
+ }
+ continue;
+ }
+ } // end of for
+ LOG_WARN("Retry many times, still failed");
+ }
+ THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
+}
+
+SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
+ const MQMessageQueue& mq,
+ int communicationMode,
+ SendCallback* sendCallback) {
+ string brokerAddr =
+ getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
+
+ if (brokerAddr.empty()) {
+ getFactory()->tryToFindTopicPublishInfo(mq.getTopic(),
+ getSessionCredentials());
+ brokerAddr = getFactory()->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ if (!brokerAddr.empty()) {
+ try {
+ LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(),
+ mq.toString().c_str());
+ int sysFlag = 0;
+ if (tryToCompressMessage(msg)) {
+ sysFlag |= MessageSysFlag::CompressedFlag;
+ }
+
+ string tranMsg =
+ msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
+ if (!tranMsg.empty() && tranMsg == "true") {
+ sysFlag |= MessageSysFlag::TransactionPreparedType;
+ }
+
+ SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+ requestHeader->producerGroup = getGroupName();
+ requestHeader->topic = (msg.getTopic());
+ requestHeader->defaultTopic = DEFAULT_TOPIC;
+ requestHeader->defaultTopicQueueNums = 4;
+ requestHeader->queueId = (mq.getQueueId());
+ requestHeader->sysFlag = (sysFlag);
+ requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
+ requestHeader->flag = (msg.getFlag());
+ requestHeader->properties =
+ (MQDecoder::messageProperties2String(msg.getProperties()));
+
+ return getFactory()->getMQClientAPIImpl()->sendMessage(
+ brokerAddr, mq.getBrokerName(), msg, requestHeader,
+ getSendMsgTimeout(), communicationMode, sendCallback,
+ getSessionCredentials());
+ } catch (MQException& e) {
+ throw e;
+ }
+ }
+ THROW_MQEXCEPTION(MQClientException,
+ "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+SendResult DefaultMQProducer::sendSelectImpl(MQMessage& msg,
+ MessageQueueSelector* pSelector,
+ void* pArg, int communicationMode,
+ SendCallback* sendCallback) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+
+ boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+ getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+ getSessionCredentials()));
+ boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+ weak_topicPublishInfo.lock());
+ if (topicPublishInfo) //&& topicPublishInfo->ok())
+ {
+ MQMessageQueue mq =
+ pSelector->select(topicPublishInfo->getMessageQueueList(), msg, pArg);
+ return sendKernelImpl(msg, mq, communicationMode, sendCallback);
+ }
+ THROW_MQEXCEPTION(MQClientException, "No route info for this topic", -1);
+}
+
+SendResult DefaultMQProducer::sendAutoRetrySelectImpl(
+ MQMessage& msg, MessageQueueSelector* pSelector, void* pArg,
+ int communicationMode, SendCallback* pSendCallback, int autoRetryTimes,
+ bool bActiveMQ) {
+ Validators::checkMessage(msg, getMaxMessageSize());
+
+ MQMessageQueue lastmq;
+ MQMessageQueue mq;
+ int mq_index = 0;
+ for (int times = 1; times <= autoRetryTimes + 1; times++) {
+ boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+ getFactory()->tryToFindTopicPublishInfo(msg.getTopic(),
+ getSessionCredentials()));
+ boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+ weak_topicPublishInfo.lock());
+ if (topicPublishInfo) {
+ SendResult sendResult;
+ if (times == 1) { // always send to selected MQ firstly, evenif bActiveMQ
+ // was setted to true
+ mq = pSelector->select(topicPublishInfo->getMessageQueueList(), msg,
+ pArg);
+ lastmq = mq;
+ } else {
+ LOG_INFO("sendAutoRetrySelectImpl with times:%d", times);
+ vector<MQMessageQueue> mqs(topicPublishInfo->getMessageQueueList());
+ for (size_t i = 0; i < mqs.size(); i++) {
+ if (mqs[i] == lastmq) mq_index = i;
+ }
+ if (bActiveMQ)
+ mq = topicPublishInfo->selectOneActiveMessageQueue(lastmq, mq_index);
+ else
+ mq = topicPublishInfo->selectOneMessageQueue(lastmq, mq_index);
+ lastmq = mq;
+ if (mq.getQueueId() == -1) {
+ // THROW_MQEXCEPTION(MQClientException, "the MQMessageQueue is
+ // invalide", -1);
+ continue;
+ }
+ }
+
+ try {
+ LOG_DEBUG("send to broker:%s", mq.toString().c_str());
+ sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
+ switch (communicationMode) {
+ case ComMode_ASYNC:
+ return sendResult;
+ case ComMode_ONEWAY:
+ return sendResult;
+ case ComMode_SYNC:
+ if (sendResult.getSendStatus() != SEND_OK) {
+ if (bActiveMQ) {
+ topicPublishInfo->updateNonServiceMessageQueue(
+ mq, getSendMsgTimeout());
+ }
+ continue;
+ }
+ return sendResult;
+ default:
+ break;
+ }
+ } catch (...) {
+ LOG_ERROR("send failed of times:%d,mq:%s", times,
+ mq.toString().c_str());
+ if (bActiveMQ) {
+ topicPublishInfo->updateNonServiceMessageQueue(mq,
+ getSendMsgTimeout());
+ }
+ continue;
+ }
+ } // end of for
+ LOG_WARN("Retry many times, still failed");
+ }
+ THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
+}
+
+bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) {
+ string body = msg.getBody();
+ if ((int)body.length() >= getCompressMsgBodyOverHowmuch()) {
+ string outBody;
+ if (UtilAll::deflate(body, outBody, getCompressLevel())) {
+ msg.setBody(outBody);
+ return true;
+ }
+ }
+
+ return false;
+}
+int DefaultMQProducer::getRetryTimes() const { return m_retryTimes; }
+void DefaultMQProducer::setRetryTimes(int times) {
+ if (times <= 0) {
+ LOG_WARN("set retry times illegal, use default value:5");
+ return;
+ }
+
+ if (times > 15) {
+ LOG_WARN("set retry times illegal, use max value:15");
+ m_retryTimes = 15;
+ return;
+ }
+ LOG_WARN("set retry times to:%d", times);
+ m_retryTimes = times;
+}
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/SendResult.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/SendResult.cpp b/rocketmq-cpp/src/producer/SendResult.cpp
new file mode 100755
index 0000000..7fd844e
--- /dev/null
+++ b/rocketmq-cpp/src/producer/SendResult.cpp
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SendResult.h"
+#include "UtilAll.h"
+#include "VirtualEnvUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+SendResult::SendResult() : m_sendStatus(SEND_OK), m_queueOffset(0) {}
+
+SendResult::SendResult(const SendStatus& sendStatus, const string& msgId,
+ const MQMessageQueue& messageQueue, int64 queueOffset)
+ : m_sendStatus(sendStatus),
+ m_msgId(msgId),
+ m_messageQueue(messageQueue),
+ m_queueOffset(queueOffset) {}
+
+SendResult::SendResult(const SendResult& other) {
+ m_sendStatus = other.m_sendStatus;
+ m_msgId = other.m_msgId;
+ m_messageQueue = other.m_messageQueue;
+ m_queueOffset = other.m_queueOffset;
+}
+
+SendResult& SendResult::operator=(const SendResult& other) {
+ if (this != &other) {
+ m_sendStatus = other.m_sendStatus;
+ m_msgId = other.m_msgId;
+ m_messageQueue = other.m_messageQueue;
+ m_queueOffset = other.m_queueOffset;
+ }
+ return *this;
+}
+
+SendResult::~SendResult() {}
+
+const string& SendResult::getMsgId() const { return m_msgId; }
+
+SendStatus SendResult::getSendStatus() const { return m_sendStatus; }
+
+MQMessageQueue SendResult::getMessageQueue() const { return m_messageQueue; }
+
+int64 SendResult::getQueueOffset() const { return m_queueOffset; }
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/producer/TopicPublishInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/producer/TopicPublishInfo.h b/rocketmq-cpp/src/producer/TopicPublishInfo.h
new file mode 100755
index 0000000..726b231
--- /dev/null
+++ b/rocketmq-cpp/src/producer/TopicPublishInfo.h
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICPUBLISHINFO_H__
+#define __TOPICPUBLISHINFO_H__
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/atomic.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+#include "Logging.h"
+#include "MQMessageQueue.h"
+
+namespace rocketmq {
+//<!************************************************************************/
+class TopicPublishInfo {
+ public:
+ TopicPublishInfo() : m_sendWhichQueue(0) {
+ m_async_service_thread.reset(new boost::thread(
+ boost::bind(&TopicPublishInfo::boost_asio_work, this)));
+ }
+
+ void boost_asio_work() {
+ boost::asio::io_service::work work(m_async_ioService); // avoid async io
+ // service stops
+ // after first timer
+ // timeout callback
+ boost::system::error_code e;
+ boost::asio::deadline_timer t(m_async_ioService,
+ boost::posix_time::seconds(60));
+ t.async_wait(boost::bind(
+ &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, &t));
+ boost::system::error_code ec;
+ m_async_ioService.run(ec);
+ }
+
+ virtual ~TopicPublishInfo() {
+ m_async_ioService.stop();
+ m_async_service_thread->interrupt();
+ m_async_service_thread->join();
+
+ m_nonSerivceQueues.clear();
+ m_onSerivceQueues.clear();
+ m_brokerTimerMap.clear();
+ m_queues.clear();
+ }
+
+ bool ok() {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+ return !m_queues.empty();
+ }
+
+ void updateMessageQueueList(const MQMessageQueue& mq) {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+ m_queues.push_back(mq);
+ string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+ m_onSerivceQueues[key] = mq;
+ if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+ m_nonSerivceQueues.erase(key); // if topicPublishInfo changed, erase this
+ // mq from m_nonSerivceQueues to avoid 2
+ // copies both in m_onSerivceQueues and
+ // m_nonSerivceQueues
+ }
+ }
+
+ void op_resumeNonServiceMessageQueueList(boost::system::error_code& ec,
+ boost::asio::deadline_timer* t) {
+ resumeNonServiceMessageQueueList();
+ boost::system::error_code e;
+ t->expires_at(t->expires_at() + boost::posix_time::seconds(60), e);
+ t->async_wait(boost::bind(
+ &TopicPublishInfo::op_resumeNonServiceMessageQueueList, this, e, t));
+ }
+
+ void resumeNonServiceMessageQueueList() {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+ for (map<MQMessageQueue, int64>::iterator it = m_brokerTimerMap.begin();
+ it != m_brokerTimerMap.end(); ++it) {
+ if (UtilAll::currentTimeMillis() - it->second >= 1000 * 60 * 5) {
+ string key = it->first.getBrokerName() +
+ UtilAll::to_string(it->first.getQueueId());
+ if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+ m_nonSerivceQueues.erase(key);
+ }
+ m_onSerivceQueues[key] = it->first;
+ }
+ }
+ }
+
+ void updateNonServiceMessageQueue(const MQMessageQueue& mq,
+ int timeoutMilliseconds) {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+ string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+ if (m_nonSerivceQueues.find(key) != m_nonSerivceQueues.end()) {
+ return;
+ }
+ LOG_INFO("updateNonServiceMessageQueue of mq:%s", mq.toString().c_str());
+ m_brokerTimerMap[mq] = UtilAll::currentTimeMillis();
+ m_nonSerivceQueues[key] = mq;
+ if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
+ m_onSerivceQueues.erase(key);
+ }
+ }
+
+ vector<MQMessageQueue>& getMessageQueueList() {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+ return m_queues;
+ }
+
+ int getWhichQueue() {
+ return m_sendWhichQueue.load(boost::memory_order_acquire);
+ }
+
+ MQMessageQueue selectOneMessageQueue(const MQMessageQueue& lastmq,
+ int& mq_index) {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+ if (m_queues.size() > 0) {
+ LOG_DEBUG("selectOneMessageQueue Enter, queue size:%zu", m_queues.size());
+ unsigned int pos = 0;
+ if (mq_index >= 0) {
+ pos = mq_index % m_queues.size();
+ } else {
+ LOG_ERROR("mq_index is negative");
+ return MQMessageQueue();
+ }
+ if (!lastmq.getBrokerName().empty()) {
+ for (size_t i = 0; i < m_queues.size(); i++) {
+ if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+ numeric_limits<int>::max()) {
+ m_sendWhichQueue.store(0, boost::memory_order_release);
+ }
+
+ if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+ ++m_sendWhichQueue;
+ MQMessageQueue mq = m_queues.at(pos);
+ LOG_DEBUG("lastmq broker not empty, m_sendWhichQueue:%d, pos:%d",
+ m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+ if (mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) {
+ mq_index = pos;
+ return mq;
+ }
+ ++pos;
+ }
+ LOG_ERROR("could not find property mq");
+ return MQMessageQueue();
+ } else {
+ if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+ numeric_limits<int>::max()) {
+ m_sendWhichQueue.store(0, boost::memory_order_release);
+ }
+
+ ++m_sendWhichQueue;
+ LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
+ m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+ mq_index = pos;
+ return m_queues.at(pos);
+ }
+ } else {
+ LOG_ERROR("m_queues empty");
+ return MQMessageQueue();
+ }
+ }
+
+ MQMessageQueue selectOneActiveMessageQueue(const MQMessageQueue& lastmq,
+ int& mq_index) {
+ boost::lock_guard<boost::mutex> lock(m_queuelock);
+
+ if (m_queues.size() > 0) {
+ unsigned int pos = 0;
+ if (mq_index >= 0) {
+ pos = mq_index % m_queues.size();
+ } else {
+ LOG_ERROR("mq_index is negative");
+ return MQMessageQueue();
+ }
+ if (!lastmq.getBrokerName().empty()) {
+ for (size_t i = 0; i < m_queues.size(); i++) {
+ if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+ numeric_limits<int>::max()) {
+ m_sendWhichQueue.store(0, boost::memory_order_release);
+ }
+
+ if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+ ++m_sendWhichQueue;
+ MQMessageQueue mq = m_queues.at(pos);
+ string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+ if ((mq.getBrokerName().compare(lastmq.getBrokerName()) != 0) &&
+ (m_onSerivceQueues.find(key) != m_onSerivceQueues.end())) {
+ mq_index = pos;
+ return mq;
+ }
+ ++pos;
+ }
+
+ for (MQMAP::iterator it = m_nonSerivceQueues.begin();
+ it != m_nonSerivceQueues.end();
+ ++it) { // if no MQMessageQueue(except lastmq) in
+ // m_onSerivceQueues, search m_nonSerivceQueues
+ if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
+ return it->second;
+ }
+ LOG_ERROR("can not find property mq");
+ return MQMessageQueue();
+ } else {
+ for (size_t i = 0; i < m_queues.size(); i++) {
+ if (m_sendWhichQueue.load(boost::memory_order_acquire) ==
+ numeric_limits<int>::max()) {
+ m_sendWhichQueue.store(0, boost::memory_order_release);
+ }
+ if (pos >= m_queues.size()) pos = pos % m_queues.size();
+
+ ++m_sendWhichQueue;
+ LOG_DEBUG("lastmq broker empty, m_sendWhichQueue:%d, pos:%d",
+ m_sendWhichQueue.load(boost::memory_order_acquire), pos);
+ mq_index = pos;
+ MQMessageQueue mq = m_queues.at(pos);
+ string key = mq.getBrokerName() + UtilAll::to_string(mq.getQueueId());
+ if (m_onSerivceQueues.find(key) != m_onSerivceQueues.end()) {
+ return mq;
+ } else {
+ ++pos;
+ }
+ }
+
+ for (MQMAP::iterator it = m_nonSerivceQueues.begin();
+ it != m_nonSerivceQueues.end();
+ ++it) { // if no MQMessageQueue(except lastmq) in
+ // m_onSerivceQueues, search m_nonSerivceQueues
+ if (it->second.getBrokerName().compare(lastmq.getBrokerName()) != 0)
+ return it->second;
+ }
+ LOG_ERROR("can not find property mq");
+ return MQMessageQueue();
+ }
+ } else {
+ LOG_ERROR("m_queues empty");
+ return MQMessageQueue();
+ }
+ }
+
+ private:
+ boost::mutex m_queuelock;
+ typedef vector<MQMessageQueue> QueuesVec;
+ QueuesVec m_queues;
+ typedef map<string, MQMessageQueue> MQMAP;
+ MQMAP m_onSerivceQueues;
+ MQMAP m_nonSerivceQueues;
+ boost::atomic<int> m_sendWhichQueue;
+ map<MQMessageQueue, int64> m_brokerTimerMap;
+ boost::asio::io_service m_async_ioService;
+ boost::scoped_ptr<boost::thread> m_async_service_thread;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/CommandHeader.cpp b/rocketmq-cpp/src/protocol/CommandHeader.cpp
new file mode 100644
index 0000000..366ac2e
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/CommandHeader.cpp
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CommandHeader.h"
+#include <cstdlib>
+#include <sstream>
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+void GetRouteInfoRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+}
+
+void GetRouteInfoRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+}
+//<!***************************************************************************
+void UnregisterClientRequestHeader::Encode(Json::Value& outData) {
+ outData["clientID"] = clientID;
+ outData["producerGroup"] = producerGroup;
+ outData["consumerGroup"] = consumerGroup;
+}
+
+void UnregisterClientRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("clientID", clientID));
+ requestMap.insert(pair<string, string>("producerGroup", producerGroup));
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+}
+//<!************************************************************************
+void CreateTopicRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+ outData["defaultTopic"] = defaultTopic;
+ outData["readQueueNums"] = readQueueNums;
+ outData["writeQueueNums"] = writeQueueNums;
+ outData["perm"] = perm;
+ outData["topicFilterType"] = topicFilterType;
+}
+void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(pair<string, string>("defaultTopic", defaultTopic));
+ requestMap.insert(
+ pair<string, string>("readQueueNums", UtilAll::to_string(readQueueNums)));
+ requestMap.insert(pair<string, string>("writeQueueNums",
+ UtilAll::to_string(writeQueueNums)));
+ requestMap.insert(pair<string, string>("perm", UtilAll::to_string(perm)));
+ requestMap.insert(pair<string, string>("topicFilterType", topicFilterType));
+}
+
+//<!************************************************************************
+void SendMessageRequestHeader::Encode(Json::Value& outData) {
+ outData["producerGroup"] = producerGroup;
+ outData["topic"] = topic;
+ outData["defaultTopic"] = defaultTopic;
+ outData["defaultTopicQueueNums"] = defaultTopicQueueNums;
+ outData["queueId"] = queueId;
+ outData["sysFlag"] = sysFlag;
+ outData["bornTimestamp"] = UtilAll::to_string(bornTimestamp);
+ outData["flag"] = flag;
+ outData["properties"] = properties;
+#ifdef ONS
+ outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
+ outData["unitMode"] = UtilAll::to_string(unitMode);
+#endif
+}
+
+int SendMessageRequestHeader::getReconsumeTimes() { return reconsumeTimes; }
+
+void SendMessageRequestHeader::setReconsumeTimes(int input_reconsumeTimes) {
+ reconsumeTimes = input_reconsumeTimes;
+}
+
+void SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ LOG_DEBUG(
+ "SendMessageRequestHeader producerGroup is:%s,topic is:%s, defaulttopic "
+ "is:%s, properties is:%s,UtilAll::to_string( defaultTopicQueueNums) "
+ "is:%s,UtilAll::to_string( queueId):%s, UtilAll::to_string( sysFlag) "
+ "is:%s, UtilAll::to_string( bornTimestamp) is:%s,UtilAll::to_string( "
+ "flag) is:%s",
+ producerGroup.c_str(), topic.c_str(), defaultTopic.c_str(),
+ properties.c_str(), UtilAll::to_string(defaultTopicQueueNums).c_str(),
+ UtilAll::to_string(queueId).c_str(), UtilAll::to_string(sysFlag).c_str(),
+ UtilAll::to_string(bornTimestamp).c_str(),
+ UtilAll::to_string(flag).c_str());
+
+ requestMap.insert(pair<string, string>("producerGroup", producerGroup));
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(pair<string, string>("defaultTopic", defaultTopic));
+ requestMap.insert(pair<string, string>(
+ "defaultTopicQueueNums", UtilAll::to_string(defaultTopicQueueNums)));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+ requestMap.insert(
+ pair<string, string>("sysFlag", UtilAll::to_string(sysFlag)));
+ requestMap.insert(
+ pair<string, string>("bornTimestamp", UtilAll::to_string(bornTimestamp)));
+ requestMap.insert(pair<string, string>("flag", UtilAll::to_string(flag)));
+ requestMap.insert(pair<string, string>("properties", properties));
+#ifdef ONS
+ requestMap.insert(pair<string, string>("reconsumeTimes",
+ UtilAll::to_string(reconsumeTimes)));
+ requestMap.insert(
+ pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
+#endif
+}
+
+//<!************************************************************************
+CommandHeader* SendMessageResponseHeader::Decode(Json::Value& ext) {
+ SendMessageResponseHeader* h = new SendMessageResponseHeader();
+
+ Json::Value& tempValue = ext["msgId"];
+ if (tempValue.isString()) {
+ h->msgId = tempValue.asString();
+ }
+
+ tempValue = ext["queueId"];
+ if (tempValue.isString()) {
+ h->queueId = atoi(tempValue.asCString());
+ }
+
+ tempValue = ext["queueOffset"];
+ if (tempValue.isString()) {
+ h->queueOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("msgId", msgId));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+ requestMap.insert(
+ pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+}
+//<!************************************************************************
+void PullMessageRequestHeader::Encode(Json::Value& outData) {
+ outData["consumerGroup"] = consumerGroup;
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+ outData["queueOffset"] = UtilAll::to_string(queueOffset);
+ ;
+ outData["maxMsgNums"] = maxMsgNums;
+ outData["sysFlag"] = sysFlag;
+ outData["commitOffset"] = UtilAll::to_string(commitOffset);
+ ;
+ outData["subVersion"] = UtilAll::to_string(subVersion);
+ ;
+ outData["suspendTimeoutMillis"] = UtilAll::to_string(suspendTimeoutMillis);
+ ;
+ outData["subscription"] = subscription;
+}
+
+void PullMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+ requestMap.insert(
+ pair<string, string>("queueOffset", UtilAll::to_string(queueOffset)));
+ requestMap.insert(
+ pair<string, string>("maxMsgNums", UtilAll::to_string(maxMsgNums)));
+ requestMap.insert(
+ pair<string, string>("sysFlag", UtilAll::to_string(sysFlag)));
+ requestMap.insert(
+ pair<string, string>("commitOffset", UtilAll::to_string(commitOffset)));
+ requestMap.insert(
+ pair<string, string>("subVersion", UtilAll::to_string(subVersion)));
+ requestMap.insert(pair<string, string>(
+ "suspendTimeoutMillis", UtilAll::to_string(suspendTimeoutMillis)));
+ requestMap.insert(pair<string, string>("subscription", subscription));
+}
+//<!************************************************************************
+CommandHeader* PullMessageResponseHeader::Decode(Json::Value& ext) {
+ PullMessageResponseHeader* h = new PullMessageResponseHeader();
+
+ Json::Value& tempValue = ext["suggestWhichBrokerId"];
+ if (tempValue.isString()) {
+ h->suggestWhichBrokerId = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ tempValue = ext["nextBeginOffset"];
+ if (tempValue.isString()) {
+ h->nextBeginOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ tempValue = ext["minOffset"];
+ if (tempValue.isString()) {
+ h->minOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ tempValue = ext["maxOffset"];
+ if (tempValue.isString()) {
+ h->maxOffset = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ return h;
+}
+
+void PullMessageResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>(
+ "suggestWhichBrokerId", UtilAll::to_string(suggestWhichBrokerId)));
+ requestMap.insert(pair<string, string>("nextBeginOffset",
+ UtilAll::to_string(nextBeginOffset)));
+ requestMap.insert(
+ pair<string, string>("minOffset", UtilAll::to_string(minOffset)));
+ requestMap.insert(
+ pair<string, string>("maxOffset", UtilAll::to_string(maxOffset)));
+}
+//<!************************************************************************
+void GetConsumerListByGroupResponseHeader::Encode(Json::Value& outData) {
+ // outData = "{}";
+}
+
+void GetConsumerListByGroupResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {}
+//<!***************************************************************************
+void GetMinOffsetRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+}
+
+void GetMinOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetMinOffsetResponseHeader::Decode(Json::Value& ext) {
+ GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader();
+
+ Json::Value& tempValue = ext["offset"];
+ if (tempValue.isString()) {
+ h->offset = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void GetMinOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetMaxOffsetRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+}
+
+void GetMaxOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetMaxOffsetResponseHeader::Decode(Json::Value& ext) {
+ GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader();
+
+ Json::Value& tempValue = ext["offset"];
+ if (tempValue.isString()) {
+ h->offset = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void GetMaxOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void SearchOffsetRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+ outData["timestamp"] = UtilAll::to_string(timestamp);
+}
+
+void SearchOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+ requestMap.insert(
+ pair<string, string>("timestamp", UtilAll::to_string(timestamp)));
+}
+//<!***************************************************************************
+CommandHeader* SearchOffsetResponseHeader::Decode(Json::Value& ext) {
+ SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader();
+
+ Json::Value& tempValue = ext["offset"];
+ if (tempValue.isString()) {
+ h->offset = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void SearchOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void ViewMessageRequestHeader::Encode(Json::Value& outData) {
+ outData["offset"] = UtilAll::to_string(offset);
+}
+
+void ViewMessageRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetEarliestMsgStoretimeRequestHeader::Encode(Json::Value& outData) {
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+}
+
+void GetEarliestMsgStoretimeRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* GetEarliestMsgStoretimeResponseHeader::Decode(
+ Json::Value& ext) {
+ GetEarliestMsgStoretimeResponseHeader* h =
+ new GetEarliestMsgStoretimeResponseHeader();
+
+ Json::Value& tempValue = ext["timestamp"];
+ if (tempValue.isString()) {
+ h->timestamp = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void GetEarliestMsgStoretimeResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(
+ pair<string, string>("timestamp", UtilAll::to_string(timestamp)));
+}
+//<!***************************************************************************
+void GetConsumerListByGroupRequestHeader::Encode(Json::Value& outData) {
+ outData["consumerGroup"] = consumerGroup;
+}
+
+void GetConsumerListByGroupRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+}
+//<!***************************************************************************
+void QueryConsumerOffsetRequestHeader::Encode(Json::Value& outData) {
+ outData["consumerGroup"] = consumerGroup;
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+}
+
+void QueryConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+}
+//<!***************************************************************************
+CommandHeader* QueryConsumerOffsetResponseHeader::Decode(
+ Json::Value& ext) {
+ QueryConsumerOffsetResponseHeader* h =
+ new QueryConsumerOffsetResponseHeader();
+ Json::Value& tempValue = ext["offset"];
+ if (tempValue.isString()) {
+ h->offset = UtilAll::str2ll(tempValue.asCString());
+ }
+ return h;
+}
+
+void QueryConsumerOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void UpdateConsumerOffsetRequestHeader::Encode(Json::Value& outData) {
+ outData["consumerGroup"] = consumerGroup;
+ outData["topic"] = topic;
+ outData["queueId"] = queueId;
+ outData["commitOffset"] = UtilAll::to_string(commitOffset);
+}
+
+void UpdateConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+ requestMap.insert(pair<string, string>("topic", topic));
+ requestMap.insert(
+ pair<string, string>("queueId", UtilAll::to_string(queueId)));
+ requestMap.insert(
+ pair<string, string>("commitOffset", UtilAll::to_string(commitOffset)));
+}
+//<!***************************************************************************
+void ConsumerSendMsgBackRequestHeader::Encode(Json::Value& outData) {
+ outData["group"] = group;
+ outData["delayLevel"] = delayLevel;
+ outData["offset"] = UtilAll::to_string(offset);
+#ifdef ONS
+ outData["originMsgId"] = originMsgId;
+ outData["originTopic"] = originTopic;
+#endif
+}
+
+void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("group", group));
+ requestMap.insert(
+ pair<string, string>("delayLevel", UtilAll::to_string(delayLevel)));
+ requestMap.insert(pair<string, string>("offset", UtilAll::to_string(offset)));
+}
+//<!***************************************************************************
+void GetConsumerListByGroupResponseBody::Decode(const MemoryBlock* mem,
+ vector<string>& cids) {
+ cids.clear();
+ //<! decode;
+ const char* const pData = static_cast<const char*>(mem->getData());
+
+ Json::Reader reader;
+ Json::Value root;
+ if (!reader.parse(pData, root)) {
+ LOG_ERROR("GetConsumerListByGroupResponse error");
+ return;
+ }
+
+ Json::Value ids = root["consumerIdList"];
+ for (unsigned int i = 0; i < ids.size(); i++) {
+ if (ids[i].isString()) {
+ cids.push_back(ids[i].asString());
+ }
+ }
+}
+
+void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {}
+
+void ResetOffsetRequestHeader::setTopic(const string& tmp) { topic = tmp; }
+
+void ResetOffsetRequestHeader::setGroup(const string& tmp) { group = tmp; }
+
+void ResetOffsetRequestHeader::setTimeStamp(const int64& tmp) {
+ timestamp = tmp;
+}
+
+void ResetOffsetRequestHeader::setForceFlag(const bool& tmp) { isForce = tmp; }
+
+const string ResetOffsetRequestHeader::getTopic() const { return topic; }
+
+const string ResetOffsetRequestHeader::getGroup() const { return group; }
+
+const int64 ResetOffsetRequestHeader::getTimeStamp() const { return timestamp; }
+
+const bool ResetOffsetRequestHeader::getForceFlag() const { return isForce; }
+
+CommandHeader* ResetOffsetRequestHeader::Decode(Json::Value& ext) {
+ ResetOffsetRequestHeader* h = new ResetOffsetRequestHeader();
+
+ Json::Value& tempValue = ext["topic"];
+ if (tempValue.isString()) {
+ h->topic = tempValue.asString();
+ }
+
+ tempValue = ext["group"];
+ if (tempValue.isString()) {
+ h->group = tempValue.asString();
+ }
+
+ tempValue = ext["timestamp"];
+ if (tempValue.isString()) {
+ h->timestamp = UtilAll::str2ll(tempValue.asCString());
+ }
+
+ tempValue = ext["isForce"];
+ if (tempValue.isString()) {
+ h->isForce = UtilAll::to_bool(tempValue.asCString());
+ }
+ LOG_INFO("topic:%s, group:%s, timestamp:%lld, isForce:%d,isForce:%s",
+ h->topic.c_str(), h->group.c_str(), h->timestamp, h->isForce,
+ tempValue.asCString());
+ return h;
+}
+
+CommandHeader* GetConsumerRunningInfoRequestHeader::Decode(
+ Json::Value& ext) {
+ GetConsumerRunningInfoRequestHeader* h =
+ new GetConsumerRunningInfoRequestHeader();
+
+ Json::Value& tempValue = ext["consumerGroup"];
+ if (tempValue.isString()) {
+ h->consumerGroup = tempValue.asString();
+ }
+
+ tempValue = ext["clientId"];
+ if (tempValue.isString()) {
+ h->clientId = tempValue.asString();
+ }
+
+ tempValue = ext["jstackEnable"];
+ if (tempValue.isString()) {
+ h->jstackEnable = UtilAll::to_bool(tempValue.asCString());
+ }
+ LOG_INFO("consumerGroup:%s, clientId:%s, jstackEnable:%d",
+ h->consumerGroup.c_str(), h->clientId.c_str(), h->jstackEnable);
+ return h;
+}
+
+void GetConsumerRunningInfoRequestHeader::Encode(Json::Value& outData) {
+ outData["consumerGroup"] = consumerGroup;
+ outData["clientId"] = clientId;
+ outData["jstackEnable"] = jstackEnable;
+}
+
+void GetConsumerRunningInfoRequestHeader::SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {
+ requestMap.insert(pair<string, string>("consumerGroup", consumerGroup));
+ requestMap.insert(pair<string, string>("clientId", clientId));
+ requestMap.insert(
+ pair<string, string>("jstackEnable", UtilAll::to_string(jstackEnable)));
+}
+
+const string GetConsumerRunningInfoRequestHeader::getConsumerGroup() const {
+ return consumerGroup;
+}
+
+void GetConsumerRunningInfoRequestHeader::setConsumerGroup(
+ const string& Group) {
+ consumerGroup = Group;
+}
+
+const string GetConsumerRunningInfoRequestHeader::getClientId() const {
+ return clientId;
+}
+
+void GetConsumerRunningInfoRequestHeader::setClientId(
+ const string& input_clientId) {
+ clientId = input_clientId;
+}
+
+const bool GetConsumerRunningInfoRequestHeader::isJstackEnable() const {
+ return jstackEnable;
+}
+
+void GetConsumerRunningInfoRequestHeader::setJstackEnable(
+ const bool& input_jstackEnable) {
+ jstackEnable = input_jstackEnable;
+}
+
+CommandHeader* NotifyConsumerIdsChangedRequestHeader::Decode(
+ Json::Value& ext) {
+ NotifyConsumerIdsChangedRequestHeader* h =
+ new NotifyConsumerIdsChangedRequestHeader();
+
+ Json::Value& tempValue = ext["consumerGroup"];
+ if (tempValue.isString()) {
+ h->consumerGroup = tempValue.asString();
+ }
+
+ return h;
+}
+
+void NotifyConsumerIdsChangedRequestHeader::setGroup(const string& tmp) {
+ consumerGroup = tmp;
+}
+const string NotifyConsumerIdsChangedRequestHeader::getGroup() const {
+ return consumerGroup;
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/CommandHeader.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/CommandHeader.h b/rocketmq-cpp/src/protocol/CommandHeader.h
new file mode 100644
index 0000000..5a55c55
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/CommandHeader.h
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMMANDCUSTOMHEADER_H__
+#define __COMMANDCUSTOMHEADER_H__
+
+#include <string>
+#include "MQClientException.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+class CommandHeader {
+ public:
+ virtual ~CommandHeader() {}
+ virtual void Encode(Json::Value& outData) {}
+ virtual void SetDeclaredFieldOfCommandHeader(
+ map<string, string>& requestMap) {}
+};
+
+//<!************************************************************************
+class GetRouteInfoRequestHeader : public CommandHeader {
+ public:
+ GetRouteInfoRequestHeader(const string& top) : topic(top) {}
+ virtual ~GetRouteInfoRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ private:
+ string topic;
+};
+
+//<!************************************************************************
+class UnregisterClientRequestHeader : public CommandHeader {
+ public:
+ UnregisterClientRequestHeader(string cID, string proGroup, string conGroup)
+ : clientID(cID), producerGroup(proGroup), consumerGroup(conGroup) {}
+ virtual ~UnregisterClientRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ private:
+ string clientID;
+ string producerGroup;
+ string consumerGroup;
+};
+
+//<!************************************************************************
+class CreateTopicRequestHeader : public CommandHeader {
+ public:
+ CreateTopicRequestHeader() : readQueueNums(0), writeQueueNums(0), perm(0) {}
+ virtual ~CreateTopicRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string topic;
+ string defaultTopic;
+ int readQueueNums;
+ int writeQueueNums;
+ int perm;
+ string topicFilterType;
+};
+
+//<!************************************************************************
+class SendMessageRequestHeader : public CommandHeader {
+ public:
+ SendMessageRequestHeader()
+ : defaultTopicQueueNums(0),
+ queueId(0),
+ sysFlag(0),
+ bornTimestamp(0),
+ flag(0),
+ reconsumeTimes(0),
+ unitMode(false) {}
+ virtual ~SendMessageRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+ int getReconsumeTimes();
+ void setReconsumeTimes(int input_reconsumeTimes);
+
+ public:
+ string producerGroup;
+ string topic;
+ string defaultTopic;
+ int defaultTopicQueueNums;
+ int queueId;
+ int sysFlag;
+ int64 bornTimestamp;
+ int flag;
+ string properties;
+ int reconsumeTimes;
+ bool unitMode;
+};
+
+//<!************************************************************************
+class SendMessageResponseHeader : public CommandHeader {
+ public:
+ SendMessageResponseHeader() : queueId(0), queueOffset(0) { msgId.clear(); }
+ virtual ~SendMessageResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string msgId;
+ int queueId;
+ int64 queueOffset;
+};
+
+//<!************************************************************************
+class PullMessageRequestHeader : public CommandHeader {
+ public:
+ PullMessageRequestHeader()
+ : queueId(0),
+ maxMsgNums(0),
+ sysFlag(0),
+ queueOffset(0),
+ commitOffset(0),
+ suspendTimeoutMillis(0),
+ subVersion(0) {}
+ virtual ~PullMessageRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string consumerGroup;
+ string topic;
+ int queueId;
+ int maxMsgNums;
+ int sysFlag;
+ string subscription;
+ int64 queueOffset;
+ int64 commitOffset;
+ int64 suspendTimeoutMillis;
+ int64 subVersion;
+};
+
+//<!************************************************************************
+class PullMessageResponseHeader : public CommandHeader {
+ public:
+ PullMessageResponseHeader()
+ : suggestWhichBrokerId(0),
+ nextBeginOffset(0),
+ minOffset(0),
+ maxOffset(0) {}
+ virtual ~PullMessageResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 suggestWhichBrokerId;
+ int64 nextBeginOffset;
+ int64 minOffset;
+ int64 maxOffset;
+};
+
+//<!************************************************************************
+class GetConsumerListByGroupResponseHeader : public CommandHeader {
+ public:
+ GetConsumerListByGroupResponseHeader() {}
+ virtual ~GetConsumerListByGroupResponseHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+};
+
+//<!***************************************************************************
+class GetMinOffsetRequestHeader : public CommandHeader {
+ public:
+ GetMinOffsetRequestHeader() : queueId(0){};
+ virtual ~GetMinOffsetRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string topic;
+ int queueId;
+};
+
+//<!***************************************************************************
+class GetMinOffsetResponseHeader : public CommandHeader {
+ public:
+ GetMinOffsetResponseHeader() : offset(0){};
+ virtual ~GetMinOffsetResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 offset;
+};
+
+//<!***************************************************************************
+class GetMaxOffsetRequestHeader : public CommandHeader {
+ public:
+ GetMaxOffsetRequestHeader() : queueId(0){};
+ virtual ~GetMaxOffsetRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string topic;
+ int queueId;
+};
+
+//<!***************************************************************************
+class GetMaxOffsetResponseHeader : public CommandHeader {
+ public:
+ GetMaxOffsetResponseHeader() : offset(0){};
+ virtual ~GetMaxOffsetResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 offset;
+};
+
+//<!***************************************************************************
+class SearchOffsetRequestHeader : public CommandHeader {
+ public:
+ SearchOffsetRequestHeader() : queueId(0), timestamp(0){};
+ virtual ~SearchOffsetRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string topic;
+ int queueId;
+ int64 timestamp;
+};
+
+//<!***************************************************************************
+class SearchOffsetResponseHeader : public CommandHeader {
+ public:
+ SearchOffsetResponseHeader() : offset(0){};
+ virtual ~SearchOffsetResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 offset;
+};
+
+//<!***************************************************************************
+class ViewMessageRequestHeader : public CommandHeader {
+ public:
+ ViewMessageRequestHeader() : offset(0){};
+ virtual ~ViewMessageRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 offset;
+};
+
+//<!***************************************************************************
+class GetEarliestMsgStoretimeRequestHeader : public CommandHeader {
+ public:
+ GetEarliestMsgStoretimeRequestHeader() : queueId(0){};
+ virtual ~GetEarliestMsgStoretimeRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string topic;
+ int queueId;
+};
+
+//<!***************************************************************************
+class GetEarliestMsgStoretimeResponseHeader : public CommandHeader {
+ public:
+ GetEarliestMsgStoretimeResponseHeader() : timestamp(0){};
+ virtual ~GetEarliestMsgStoretimeResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 timestamp;
+};
+
+//<!***************************************************************************
+class GetConsumerListByGroupRequestHeader : public CommandHeader {
+ public:
+ GetConsumerListByGroupRequestHeader(){};
+ virtual ~GetConsumerListByGroupRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string consumerGroup;
+};
+
+//<!************************************************************************
+class QueryConsumerOffsetRequestHeader : public CommandHeader {
+ public:
+ QueryConsumerOffsetRequestHeader() : queueId(0){};
+ virtual ~QueryConsumerOffsetRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string consumerGroup;
+ string topic;
+ int queueId;
+};
+
+//<!************************************************************************
+class QueryConsumerOffsetResponseHeader : public CommandHeader {
+ public:
+ QueryConsumerOffsetResponseHeader() : offset(0){};
+ virtual ~QueryConsumerOffsetResponseHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ int64 offset;
+};
+
+//<!************************************************************************
+class UpdateConsumerOffsetRequestHeader : public CommandHeader {
+ public:
+ UpdateConsumerOffsetRequestHeader() : queueId(0), commitOffset(0){};
+ virtual ~UpdateConsumerOffsetRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string consumerGroup;
+ string topic;
+ int queueId;
+ int64 commitOffset;
+};
+
+//<!***************************************************************************
+class ConsumerSendMsgBackRequestHeader : public CommandHeader {
+ public:
+ ConsumerSendMsgBackRequestHeader() : delayLevel(0), offset(0){};
+ virtual ~ConsumerSendMsgBackRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ string group;
+ int delayLevel;
+ int64 offset;
+};
+
+//<!***************************************************************************
+class GetConsumerListByGroupResponseBody {
+ public:
+ GetConsumerListByGroupResponseBody(){};
+ virtual ~GetConsumerListByGroupResponseBody() {}
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+
+ public:
+ static void Decode(const MemoryBlock* mem, vector<string>& cids);
+};
+
+class ResetOffsetRequestHeader : public CommandHeader {
+ public:
+ ResetOffsetRequestHeader() {}
+ ~ResetOffsetRequestHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ void setTopic(const string& tmp);
+ void setGroup(const string& tmp);
+ void setTimeStamp(const int64& tmp);
+ void setForceFlag(const bool& tmp);
+ const string getTopic() const;
+ const string getGroup() const;
+ const int64 getTimeStamp() const;
+ const bool getForceFlag() const;
+
+ private:
+ string topic;
+ string group;
+ int64 timestamp;
+ bool isForce;
+};
+
+class GetConsumerRunningInfoRequestHeader : public CommandHeader {
+ public:
+ GetConsumerRunningInfoRequestHeader() {}
+ virtual ~GetConsumerRunningInfoRequestHeader() {}
+ virtual void Encode(Json::Value& outData);
+ virtual void SetDeclaredFieldOfCommandHeader(map<string, string>& requestMap);
+ static CommandHeader* Decode(Json::Value& ext);
+ const string getConsumerGroup() const;
+ void setConsumerGroup(const string& consumerGroup);
+ const string getClientId() const;
+ void setClientId(const string& clientId);
+ const bool isJstackEnable() const;
+ void setJstackEnable(const bool& jstackEnable);
+
+ private:
+ string consumerGroup;
+ string clientId;
+ bool jstackEnable;
+};
+
+class NotifyConsumerIdsChangedRequestHeader : public CommandHeader {
+ public:
+ NotifyConsumerIdsChangedRequestHeader() {}
+ virtual ~NotifyConsumerIdsChangedRequestHeader() {}
+ static CommandHeader* Decode(Json::Value& ext);
+ void setGroup(const string& tmp);
+ const string getGroup() const;
+
+ private:
+ string consumerGroup;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
new file mode 100644
index 0000000..10ac0aa
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.cpp
@@ -0,0 +1,109 @@
+#include "ConsumerRunningInfo.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+const string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR";
+const string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE =
+ "PROP_THREADPOOL_CORE_SIZE";
+const string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY";
+const string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+const string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION";
+const string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP =
+ "PROP_CONSUMER_START_TIMESTAMP";
+
+const map<string, string> ConsumerRunningInfo::getProperties() const {
+ return properties;
+}
+
+void ConsumerRunningInfo::setProperties(
+ const map<string, string>& input_properties) {
+ properties = input_properties;
+}
+
+void ConsumerRunningInfo::setProperty(const string& key, const string& value) {
+ properties[key] = value;
+}
+
+const map<MessageQueue, ProcessQueueInfo> ConsumerRunningInfo::getMqTable()
+ const {
+ return mqTable;
+}
+
+void ConsumerRunningInfo::setMqTable(MessageQueue queue,
+ ProcessQueueInfo queueInfo) {
+ mqTable[queue] = queueInfo;
+}
+
+/*const map<string, ConsumeStatus> ConsumerRunningInfo::getStatusTable() const
+{
+return statusTable;
+}
+
+
+void ConsumerRunningInfo::setStatusTable(const map<string, ConsumeStatus>&
+input_statusTable)
+{
+statusTable = input_statusTable;
+} */
+
+const vector<SubscriptionData> ConsumerRunningInfo::getSubscriptionSet() const {
+ return subscriptionSet;
+}
+
+void ConsumerRunningInfo::setSubscriptionSet(
+ const vector<SubscriptionData>& input_subscriptionSet) {
+ subscriptionSet = input_subscriptionSet;
+}
+
+const string ConsumerRunningInfo::getJstack() const { return jstack; }
+
+void ConsumerRunningInfo::setJstack(const string& input_jstack) {
+ jstack = input_jstack;
+}
+
+string ConsumerRunningInfo::encode() {
+ Json::Value outData;
+
+ outData[PROP_NAMESERVER_ADDR] = properties[PROP_NAMESERVER_ADDR];
+ outData[PROP_CONSUME_TYPE] = properties[PROP_CONSUME_TYPE];
+ outData[PROP_CLIENT_VERSION] = properties[PROP_CLIENT_VERSION];
+ outData[PROP_CONSUMER_START_TIMESTAMP] =
+ properties[PROP_CONSUMER_START_TIMESTAMP];
+ outData[PROP_CONSUME_ORDERLY] = properties[PROP_CONSUME_ORDERLY];
+ outData[PROP_THREADPOOL_CORE_SIZE] = properties[PROP_THREADPOOL_CORE_SIZE];
+
+ Json::Value root;
+ root["jstack"] = jstack;
+ root["properties"] = outData;
+
+ {
+ vector<SubscriptionData>::const_iterator it = subscriptionSet.begin();
+ for (; it != subscriptionSet.end(); it++) {
+ root["subscriptionSet"].append(it->toJson());
+ }
+ }
+
+ Json::FastWriter fastwrite;
+ string finals = fastwrite.write(root);
+
+ Json::Value mq;
+ string key = "\"mqTable\":";
+ key.append("{");
+ for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin();
+ it != mqTable.end(); ++it) {
+ key.append((it->first).toJson().toStyledString());
+ key.erase(key.end() - 1);
+ key.append(":");
+ key.append((it->second).toJson().toStyledString());
+ key.append(",");
+ }
+ key.erase(key.end() - 1);
+ key.append("}");
+
+ // insert mqTable to final string
+ key.append(",");
+ finals.insert(1, key);
+
+ return finals;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
new file mode 100644
index 0000000..6467ad5
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ConsumerRunningInfo.h
@@ -0,0 +1,50 @@
+#ifndef __CONSUMERRUNNINGINFO_H__
+#define __CONSUMERRUNNINGINFO_H__
+
+#include "MessageQueue.h"
+#include "ProcessQueueInfo.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+
+class ConsumerRunningInfo {
+ public:
+ ConsumerRunningInfo() {}
+ virtual ~ConsumerRunningInfo() {
+ properties.clear();
+ mqTable.clear();
+ subscriptionSet.clear();
+ }
+
+ public:
+ static const string PROP_NAMESERVER_ADDR;
+ static const string PROP_THREADPOOL_CORE_SIZE;
+ static const string PROP_CONSUME_ORDERLY;
+ static const string PROP_CONSUME_TYPE;
+ static const string PROP_CLIENT_VERSION;
+ static const string PROP_CONSUMER_START_TIMESTAMP;
+
+ public:
+ const map<string, string> getProperties() const;
+ void setProperties(const map<string, string>& input_properties);
+ void setProperty(const string& key, const string& value);
+ const map<MessageQueue, ProcessQueueInfo> getMqTable() const;
+ void setMqTable(MessageQueue queue, ProcessQueueInfo queueInfo);
+ // const map<string, ConsumeStatus> getStatusTable() const;
+ // void setStatusTable(const map<string, ConsumeStatus>& input_statusTable) ;
+ const vector<SubscriptionData> getSubscriptionSet() const;
+ void setSubscriptionSet(
+ const vector<SubscriptionData>& input_subscriptionSet);
+ const string getJstack() const;
+ void setJstack(const string& input_jstack);
+ string encode();
+
+ private:
+ map<string, string> properties;
+ vector<SubscriptionData> subscriptionSet;
+ map<MessageQueue, ProcessQueueInfo> mqTable;
+ // map<string, ConsumeStatus> statusTable;
+ string jstack;
+};
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/HeartbeatData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/HeartbeatData.h b/rocketmq-cpp/src/protocol/HeartbeatData.h
new file mode 100755
index 0000000..9b74280
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/HeartbeatData.h
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __HEARTBEATDATA_H__
+#define __HEARTBEATDATA_H__
+#include <boost/thread/thread.hpp>
+#include <cstdlib>
+#include <string>
+#include <vector>
+#include "ConsumeType.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class ProducerData {
+ public:
+ ProducerData(){};
+ bool operator<(const ProducerData& pd) const {
+ return groupName < pd.groupName;
+ }
+ Json::Value toJson() const {
+ Json::Value outJson;
+ outJson["groupName"] = groupName;
+ return outJson;
+ }
+
+ public:
+ string groupName;
+};
+
+//<!***************************************************************************
+class ConsumerData {
+ public:
+ ConsumerData(){};
+ virtual ~ConsumerData() { subscriptionDataSet.clear(); }
+ bool operator<(const ConsumerData& cd) const {
+ return groupName < cd.groupName;
+ }
+
+ Json::Value toJson() const {
+ Json::Value outJson;
+ outJson["groupName"] = groupName;
+ outJson["consumeFromWhere"] = consumeFromWhere;
+ outJson["consumeType"] = consumeType;
+ outJson["messageModel"] = messageModel;
+
+ vector<SubscriptionData>::const_iterator it = subscriptionDataSet.begin();
+ for (; it != subscriptionDataSet.end(); it++) {
+ outJson["subscriptionDataSet"].append((*it).toJson());
+ }
+
+ return outJson;
+ }
+
+ public:
+ string groupName;
+ ConsumeType consumeType;
+ MessageModel messageModel;
+ ConsumeFromWhere consumeFromWhere;
+ vector<SubscriptionData> subscriptionDataSet;
+};
+
+//<!***************************************************************************
+class HeartbeatData {
+ public:
+ virtual ~HeartbeatData() {
+ m_producerDataSet.clear();
+ m_consumerDataSet.clear();
+ }
+ void Encode(string& outData) {
+ Json::Value root;
+
+ //<!id;
+ root["clientID"] = m_clientID;
+
+ //<!consumer;
+ {
+ boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+ vector<ConsumerData>::iterator itc = m_consumerDataSet.begin();
+ for (; itc != m_consumerDataSet.end(); itc++) {
+ root["consumerDataSet"].append((*itc).toJson());
+ }
+ }
+
+ //<!producer;
+ {
+ boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+ vector<ProducerData>::iterator itp = m_producerDataSet.begin();
+ for (; itp != m_producerDataSet.end(); itp++) {
+ root["producerDataSet"].append((*itp).toJson());
+ }
+ }
+ //<!output;
+ Json::FastWriter fastwrite;
+ outData = fastwrite.write(root);
+ }
+
+ void setClientID(const string& clientID) { m_clientID = clientID; }
+
+ bool isProducerDataSetEmpty() {
+ boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+ return m_producerDataSet.empty();
+ }
+
+ void insertDataToProducerDataSet(ProducerData& producerData) {
+ boost::lock_guard<boost::mutex> lock(m_producerDataMutex);
+ m_producerDataSet.push_back(producerData);
+ }
+
+ bool isConsumerDataSetEmpty() {
+ boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+ return m_consumerDataSet.empty();
+ }
+
+ void insertDataToConsumerDataSet(ConsumerData& consumerData) {
+ boost::lock_guard<boost::mutex> lock(m_consumerDataMutex);
+ m_consumerDataSet.push_back(consumerData);
+ }
+
+ private:
+ string m_clientID;
+ vector<ProducerData> m_producerDataSet;
+ vector<ConsumerData> m_consumerDataSet;
+ boost::mutex m_producerDataMutex;
+ boost::mutex m_consumerDataMutex;
+};
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/KVTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/KVTable.h b/rocketmq-cpp/src/protocol/KVTable.h
new file mode 100755
index 0000000..69191b7
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/KVTable.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __KVTABLE_H__
+#define __KVTABLE_H__
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class KVTable : public RemotingSerializable {
+ public:
+ virtual ~KVTable() { m_table.clear(); }
+
+ void Encode(string& outData) {}
+
+ const map<string, string>& getTable() { return m_table; }
+
+ void setTable(const map<string, string>& table) { m_table = table; }
+
+ private:
+ map<string, string> m_table;
+};
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.cpp b/rocketmq-cpp/src/protocol/LockBatchBody.cpp
new file mode 100755
index 0000000..c56c17f
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/LockBatchBody.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "LockBatchBody.h"
+#include "Logging.h"
+namespace rocketmq { //<!end namespace;
+
+string LockBatchRequestBody::getConsumerGroup() { return consumerGroup; }
+void LockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+ consumerGroup = in_consumerGroup;
+}
+string LockBatchRequestBody::getClientId() { return clientId; }
+void LockBatchRequestBody::setClientId(string in_clientId) {
+ clientId = in_clientId;
+}
+vector<MQMessageQueue> LockBatchRequestBody::getMqSet() { return mqSet; }
+void LockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
+ mqSet.swap(in_mqSet);
+}
+void LockBatchRequestBody::Encode(string& outData) {
+ Json::Value root;
+ root["consumerGroup"] = consumerGroup;
+ root["clientId"] = clientId;
+
+ vector<MQMessageQueue>::const_iterator it = mqSet.begin();
+ for (; it != mqSet.end(); it++) {
+ root["mqSet"].append(toJson(*it));
+ }
+
+ Json::FastWriter fastwrite;
+ outData = fastwrite.write(root);
+}
+
+Json::Value LockBatchRequestBody::toJson(const MQMessageQueue& mq) const {
+ Json::Value outJson;
+ outJson["topic"] = mq.getTopic();
+ outJson["brokerName"] = mq.getBrokerName();
+ outJson["queueId"] = mq.getQueueId();
+ return outJson;
+}
+
+vector<MQMessageQueue> LockBatchResponseBody::getLockOKMQSet() {
+ return lockOKMQSet;
+}
+void LockBatchResponseBody::setLockOKMQSet(
+ vector<MQMessageQueue> in_lockOKMQSet) {
+ lockOKMQSet.swap(in_lockOKMQSet);
+}
+
+void LockBatchResponseBody::Decode(const MemoryBlock* mem,
+ vector<MQMessageQueue>& messageQueues) {
+ messageQueues.clear();
+ //<! decode;
+ const char* const pData = static_cast<const char*>(mem->getData());
+
+ Json::Reader reader;
+ Json::Value root;
+ if (!reader.parse(pData, root)) {
+ LOG_WARN("decode LockBatchResponseBody error");
+ return;
+ }
+
+ Json::Value mqs = root["lockOKMQSet"];
+ LOG_DEBUG("LockBatchResponseBody mqs size:%d", mqs.size());
+ for (unsigned int i = 0; i < mqs.size(); i++) {
+ MQMessageQueue mq;
+ Json::Value qd = mqs[i];
+ mq.setTopic(qd["topic"].asString());
+ mq.setBrokerName(qd["brokerName"].asString());
+ mq.setQueueId(qd["queueId"].asInt());
+ LOG_INFO("LockBatchResponseBody MQ:%s", mq.toString().c_str());
+ messageQueues.push_back(mq);
+ }
+}
+
+string UnlockBatchRequestBody::getConsumerGroup() { return consumerGroup; }
+void UnlockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+ consumerGroup = in_consumerGroup;
+}
+string UnlockBatchRequestBody::getClientId() { return clientId; }
+void UnlockBatchRequestBody::setClientId(string in_clientId) {
+ clientId = in_clientId;
+}
+vector<MQMessageQueue> UnlockBatchRequestBody::getMqSet() { return mqSet; }
+void UnlockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
+ mqSet.swap(in_mqSet);
+}
+void UnlockBatchRequestBody::Encode(string& outData) {
+ Json::Value root;
+ root["consumerGroup"] = consumerGroup;
+ root["clientId"] = clientId;
+
+ vector<MQMessageQueue>::const_iterator it = mqSet.begin();
+ for (; it != mqSet.end(); it++) {
+ root["mqSet"].append(toJson(*it));
+ }
+
+ Json::FastWriter fastwrite;
+ outData = fastwrite.write(root);
+}
+
+Json::Value UnlockBatchRequestBody::toJson(
+ const MQMessageQueue& mq) const {
+ Json::Value outJson;
+ outJson["topic"] = mq.getTopic();
+ outJson["brokerName"] = mq.getBrokerName();
+ outJson["queueId"] = mq.getQueueId();
+ return outJson;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/LockBatchBody.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/LockBatchBody.h b/rocketmq-cpp/src/protocol/LockBatchBody.h
new file mode 100755
index 0000000..c1d7155
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/LockBatchBody.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __LOCKBATCHBODY_H__
+#define __LOCKBATCHBODY_H__
+#include <set>
+#include <string>
+#include "MQMessageQueue.h"
+#include "RemotingSerializable.h"
+#include "dataBlock.h"
+#include "json/json.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+
+class LockBatchRequestBody {
+ public:
+ virtual ~LockBatchRequestBody() { mqSet.clear(); }
+ string getConsumerGroup();
+ void setConsumerGroup(string consumerGroup);
+ string getClientId();
+ void setClientId(string clientId);
+ vector<MQMessageQueue> getMqSet();
+ void setMqSet(vector<MQMessageQueue> mqSet);
+ void Encode(string& outData);
+ Json::Value toJson(const MQMessageQueue& mq) const;
+
+ private:
+ string consumerGroup;
+ string clientId;
+ vector<MQMessageQueue> mqSet;
+};
+
+class LockBatchResponseBody {
+ public:
+ virtual ~LockBatchResponseBody() { lockOKMQSet.clear(); }
+ vector<MQMessageQueue> getLockOKMQSet();
+ void setLockOKMQSet(vector<MQMessageQueue> lockOKMQSet);
+ static void Decode(const MemoryBlock* mem,
+ vector<MQMessageQueue>& messageQueues);
+
+ private:
+ vector<MQMessageQueue> lockOKMQSet;
+};
+
+class UnlockBatchRequestBody {
+ public:
+ virtual ~UnlockBatchRequestBody() { mqSet.clear(); }
+ string getConsumerGroup();
+ void setConsumerGroup(string consumerGroup);
+ string getClientId();
+ void setClientId(string clientId);
+ vector<MQMessageQueue> getMqSet();
+ void setMqSet(vector<MQMessageQueue> mqSet);
+ void Encode(string& outData);
+ Json::Value toJson(const MQMessageQueue& mq) const;
+
+ private:
+ string consumerGroup;
+ string clientId;
+ vector<MQMessageQueue> mqSet;
+};
+
+} //<!end namespace;
+#endif