You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:38 UTC
[04/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageQueue.cpp b/rocketmq-client4cpp/src/message/MessageQueue.cpp
deleted file mode 100755
index d632550..0000000
--- a/rocketmq-client4cpp/src/message/MessageQueue.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include "MessageQueue.h"
-
-#include <string>
-#include <sstream>
-#include <UtilAll.h>
-
-namespace rmq
-{
-
-MessageQueue::MessageQueue()
- : m_queueId(0)
-{
-}
-
-MessageQueue::MessageQueue(const std::string& topic, const std::string& brokerName, int queueId)
- : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId)
-{
-
-}
-
-std::string MessageQueue::getTopic()const
-{
- return m_topic;
-}
-
-void MessageQueue::setTopic(const std::string& topic)
-{
- m_topic = topic;
-}
-
-std::string MessageQueue::getBrokerName()const
-{
- return m_brokerName;
-}
-
-void MessageQueue::setBrokerName(const std::string& brokerName)
-{
- m_brokerName = brokerName;
-}
-
-int MessageQueue::getQueueId()const
-{
- return m_queueId;
-}
-
-void MessageQueue::setQueueId(int queueId)
-{
- m_queueId = queueId;
-}
-
-int MessageQueue::hashCode()
-{
- /*
- final int prime = 31;
- int result = 1;
- result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
- result = prime * result + queueId;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- return result;
- */
- std::stringstream ss;
- ss << m_topic << m_brokerName << m_queueId;
- return UtilAll::hashCode(ss.str());
-}
-
-std::string MessageQueue::toString() const
-{
- std::stringstream ss;
- ss << "{topic=" << m_topic
- << ",brokerName=" << m_brokerName
- << ",queueId=" << m_queueId << "}";
- return ss.str();
-}
-
-
-std::string MessageQueue::toJsonString() const
-{
- std::stringstream ss;
- ss << "{\"topic\":\"" << m_topic
- << "\",\"brokerName\":\"" << m_brokerName
- << "\",\"queueId\":" << m_queueId << "}";
- return ss.str();
-}
-
-
-bool MessageQueue::operator==(const MessageQueue& mq)const
-{
- if (this == &mq)
- {
- return true;
- }
-
- if (m_brokerName != mq.m_brokerName)
- {
- return false;
- }
-
- if (m_queueId != mq.m_queueId)
- {
- return false;
- }
-
- if (m_topic != mq.m_topic)
- {
- return false;
- }
-
- return true;
-}
-
-int MessageQueue::compareTo(const MessageQueue& mq)const
-{
- {
- int result = strcmp(m_topic.c_str(), mq.m_topic.c_str());
- if (result != 0)
- {
- return result;
- }
- }
-
- {
- int result = strcmp(m_brokerName.c_str(), mq.m_brokerName.c_str());
- if (result != 0)
- {
- return result;
- }
- }
-
- return m_queueId - mq.m_queueId;
-}
-
-bool MessageQueue::operator<(const MessageQueue& mq)const
-{
- return compareTo(mq) < 0;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
deleted file mode 100755
index dcad654..0000000
--- a/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License")
-{
-}
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "DefaultMQProducer.h"
-
-#include <assert.h>
-#include "MessageExt.h"
-#include "QueryResult.h"
-#include "DefaultMQProducerImpl.h"
-#include "MixAll.h"
-#include "MQClientException.h"
-
-namespace rmq
-{
-
-DefaultMQProducer::DefaultMQProducer()
- : m_producerGroup(MixAll::DEFAULT_PRODUCER_GROUP),
- m_createTopicKey(MixAll::DEFAULT_TOPIC),
- m_defaultTopicQueueNums(4),
- m_sendMsgTimeout(3000),
- m_compressMsgBodyOverHowmuch(1024 * 4),
- m_retryTimesWhenSendFailed(2),
- m_retryAnotherBrokerWhenNotStoreOK(false),
- m_maxMessageSize(1024 * 128),
- m_compressLevel(5)
-{
- m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
-}
-
-DefaultMQProducer::DefaultMQProducer(const std::string& producerGroup)
- : m_producerGroup(producerGroup),
- m_createTopicKey(MixAll::DEFAULT_TOPIC),
- m_defaultTopicQueueNums(4),
- m_sendMsgTimeout(3000),
- m_compressMsgBodyOverHowmuch(1024 * 4),
- m_retryTimesWhenSendFailed(2),
- m_retryAnotherBrokerWhenNotStoreOK(false),
- m_maxMessageSize(1024 * 128),
- m_compressLevel(5)
-{
- m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
-}
-
-DefaultMQProducer::~DefaultMQProducer()
-{
- // memleak: maybe core
- delete m_pDefaultMQProducerImpl;
-}
-
-
-void DefaultMQProducer::start()
-{
- m_pDefaultMQProducerImpl->start();
-}
-
-void DefaultMQProducer::shutdown()
-{
- m_pDefaultMQProducerImpl->shutdown();
-}
-
-std::vector<MessageQueue>* DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic)
-{
- return m_pDefaultMQProducerImpl->fetchPublishMessageQueues(topic);
-}
-
-SendResult DefaultMQProducer::send(Message& msg)
-{
- return m_pDefaultMQProducerImpl->send(msg);
-}
-
-void DefaultMQProducer::send(Message& msg, SendCallback* pSendCallback)
-{
- m_pDefaultMQProducerImpl->send(msg, pSendCallback);
-}
-
-void DefaultMQProducer::sendOneway(Message& msg)
-{
- m_pDefaultMQProducerImpl->sendOneway(msg);
-}
-
-SendResult DefaultMQProducer::send(Message& msg, MessageQueue& mq)
-{
- return m_pDefaultMQProducerImpl->send(msg, mq);
-}
-
-void DefaultMQProducer::send(Message& msg, MessageQueue& mq, SendCallback* pSendCallback)
-{
- m_pDefaultMQProducerImpl->send(msg, mq, pSendCallback);
-}
-
-void DefaultMQProducer::sendOneway(Message& msg, MessageQueue& mq)
-{
- m_pDefaultMQProducerImpl->sendOneway(msg, mq);
-}
-
-SendResult DefaultMQProducer::send(Message& msg, MessageQueueSelector* pSelector, void* arg)
-{
- return m_pDefaultMQProducerImpl->send(msg, pSelector, arg);
-}
-
-void DefaultMQProducer::send(Message& msg,
- MessageQueueSelector* pSelector,
- void* arg,
- SendCallback* pSendCallback)
-{
- m_pDefaultMQProducerImpl->send(msg, pSelector, arg, pSendCallback);
-}
-
-void DefaultMQProducer::sendOneway(Message& msg, MessageQueueSelector* pSelector, void* arg)
-{
- m_pDefaultMQProducerImpl->sendOneway(msg, pSelector, arg);
-}
-
-TransactionSendResult DefaultMQProducer::sendMessageInTransaction(Message& msg,
- LocalTransactionExecuter* tranExecuter, void* arg)
-{
- THROW_MQEXCEPTION(MQClientException,
- "sendMessageInTransaction not implement, please use TransactionMQProducer class", -1);
- TransactionSendResult result;
-
- return result;
-}
-
-void DefaultMQProducer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
-{
- m_pDefaultMQProducerImpl->createTopic(key, newTopic, queueNum);
-}
-
-long long DefaultMQProducer::searchOffset(const MessageQueue& mq, long long timestamp)
-{
- return m_pDefaultMQProducerImpl->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQProducer::maxOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQProducerImpl->maxOffset(mq);
-}
-
-long long DefaultMQProducer::minOffset(const MessageQueue& mq)
-{
- return m_pDefaultMQProducerImpl->minOffset(mq);
-}
-
-long long DefaultMQProducer::earliestMsgStoreTime(const MessageQueue& mq)
-{
- return m_pDefaultMQProducerImpl->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQProducer::viewMessage(const std::string& msgId)
-{
- return m_pDefaultMQProducerImpl->viewMessage(msgId);
-}
-
-QueryResult DefaultMQProducer::queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end)
-{
-
- return m_pDefaultMQProducerImpl->queryMessage(topic, key, maxNum, begin, end);
-}
-
-std::string DefaultMQProducer::getProducerGroup()
-{
- return m_producerGroup;
-}
-
-void DefaultMQProducer::setProducerGroup(const std::string& producerGroup)
-{
- m_producerGroup = producerGroup;
-}
-
-std::string DefaultMQProducer::getCreateTopicKey()
-{
- return m_createTopicKey;
-}
-
-void DefaultMQProducer::setCreateTopicKey(const std::string& createTopicKey)
-{
- m_createTopicKey = createTopicKey;
-}
-
-int DefaultMQProducer::getSendMsgTimeout()
-{
- return m_sendMsgTimeout;
-}
-
-void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout)
-{
- m_sendMsgTimeout = sendMsgTimeout;
-}
-
-int DefaultMQProducer::getCompressMsgBodyOverHowmuch()
-{
- return m_compressMsgBodyOverHowmuch;
-}
-
-void DefaultMQProducer::setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch)
-{
- m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
-}
-
-DefaultMQProducerImpl* DefaultMQProducer::getDefaultMQProducerImpl()
-{
- return m_pDefaultMQProducerImpl;
-}
-
-bool DefaultMQProducer::isRetryAnotherBrokerWhenNotStoreOK()
-{
- return m_retryAnotherBrokerWhenNotStoreOK;
-}
-
-void DefaultMQProducer::setRetryAnotherBrokerWhenNotStoreOK(bool retryAnotherBrokerWhenNotStoreOK)
-{
- m_retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
-}
-
-int DefaultMQProducer::getMaxMessageSize()
-{
- return m_maxMessageSize;
-}
-
-void DefaultMQProducer::setMaxMessageSize(int maxMessageSize)
-{
- m_maxMessageSize = maxMessageSize;
-}
-
-int DefaultMQProducer::getDefaultTopicQueueNums()
-{
- return m_defaultTopicQueueNums;
-}
-
-void DefaultMQProducer::setDefaultTopicQueueNums(int defaultTopicQueueNums)
-{
- m_defaultTopicQueueNums = defaultTopicQueueNums;
-}
-
-int DefaultMQProducer::getRetryTimesWhenSendFailed()
-{
- return m_retryTimesWhenSendFailed;
-}
-
-void DefaultMQProducer::setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)
-{
- m_retryTimesWhenSendFailed = retryTimesWhenSendFailed;
-}
-
-int DefaultMQProducer::getCompressLevel()
-{
- return m_compressLevel;
-}
-
-void DefaultMQProducer::setCompressLevel(int compressLevel)
-{
- assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1);
-
- m_compressLevel = compressLevel;
-}
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
deleted file mode 100755
index 26b3f0b..0000000
--- a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
+++ /dev/null
@@ -1,932 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License")
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "DefaultMQProducerImpl.h"
-#include "DefaultMQProducer.h"
-#include "MessageExt.h"
-#include "QueryResult.h"
-#include "TopicPublishInfo.h"
-#include "MQClientException.h"
-#include "LocalTransactionExecuter.h"
-#include "SendMessageHook.h"
-#include "MQClientManager.h"
-#include "MQClientFactory.h"
-#include "Validators.h"
-#include "MQAdminImpl.h"
-#include "MQClientAPIImpl.h"
-#include "MessageSysFlag.h"
-#include "CommandCustomHeader.h"
-#include "KPRUtil.h"
-#include "MessageDecoder.h"
-#include "MessageQueueSelector.h"
-#include "MQProtos.h"
-#include "RemotingCommand.h"
-#include "UtilAll.h"
-
-
-
-namespace rmq
-{
-
-DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducer
- *pDefaultMQProducer)
- : m_pDefaultMQProducer(pDefaultMQProducer),
- m_serviceState(CREATE_JUST),
- m_pMQClientFactory(NULL)
-{
-}
-
-DefaultMQProducerImpl::~DefaultMQProducerImpl()
-{
- //delete m_pMQClientFactory;
-}
-
-
-void DefaultMQProducerImpl::start()
-{
- start(true);
-}
-
-void DefaultMQProducerImpl::start(bool startFactory)
-{
- RMQ_DEBUG("DefaultMQProducerImpl::start()");
-
- switch (m_serviceState)
- {
- case CREATE_JUST:
- {
- RMQ_INFO("the producer [{%s}] start beginning.",
- m_pDefaultMQProducer->getProducerGroup().c_str());
-
- m_serviceState = START_FAILED;
- checkConfig();
-
- if (m_pDefaultMQProducer->getProducerGroup() !=
- MixAll::CLIENT_INNER_PRODUCER_GROUP)
- {
- m_pDefaultMQProducer->changeInstanceNameToPID();
- }
-
- m_pMQClientFactory =
- MQClientManager::getInstance()->getAndCreateMQClientFactory(
- *m_pDefaultMQProducer);
- bool registerOK = m_pMQClientFactory->registerProducer(
- m_pDefaultMQProducer->getProducerGroup(), this);
-
- if (!registerOK)
- {
- m_serviceState = CREATE_JUST;
- THROW_MQEXCEPTION(MQClientException,
- "The producer group[" + m_pDefaultMQProducer->getProducerGroup()
- + "] has been created before, specify another name please.", -1);
- }
-
- m_topicPublishInfoTable[m_pDefaultMQProducer->getCreateTopicKey()] =
- TopicPublishInfo();
-
- if (startFactory)
- {
- m_pMQClientFactory->start();
- }
-
- RMQ_INFO("the producer [%s] start OK", m_pDefaultMQProducer->getProducerGroup().c_str());
- m_serviceState = RUNNING;
- }
- break;
-
- case RUNNING:
- RMQ_ERROR("This client is already running.");
-
- case START_FAILED:
- RMQ_ERROR("This client failed to start previously.");
-
- case SHUTDOWN_ALREADY:
- RMQ_ERROR("This client has been shutted down.");
- THROW_MQEXCEPTION(MQClientException,
- "The producer service state not OK, maybe started once, ", -1);
-
- default:
- break;
- }
-
- m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
-}
-
-void DefaultMQProducerImpl::shutdown()
-{
- shutdown(true);
-}
-
-void DefaultMQProducerImpl::shutdown(bool shutdownFactory)
-{
- RMQ_DEBUG("DefaultMQProducerImpl::shutdown()");
-
- switch (m_serviceState)
- {
- case CREATE_JUST:
- break;
-
- case RUNNING:
- m_pMQClientFactory->unregisterProducer(
- m_pDefaultMQProducer->getProducerGroup());
-
- if (shutdownFactory)
- {
- m_pMQClientFactory->shutdown();
- }
-
- RMQ_INFO("the producer [%s] shutdown OK", m_pDefaultMQProducer->getProducerGroup().c_str());
- m_serviceState = SHUTDOWN_ALREADY;
- break;
-
- case SHUTDOWN_ALREADY:
- break;
-
- default:
- break;
- }
-}
-
-
-void DefaultMQProducerImpl::initTransactionEnv()
-{
- //TODO
-}
-
-void DefaultMQProducerImpl::destroyTransactionEnv()
-{
- //TODO
-}
-
-bool DefaultMQProducerImpl::hasHook()
-{
- return !m_hookList.empty();
-}
-
-void DefaultMQProducerImpl::registerHook(SendMessageHook* pHook)
-{
- m_hookList.push_back(pHook);
-}
-
-void DefaultMQProducerImpl::executeHookBefore(const SendMessageContext& context)
-{
- std::list<SendMessageHook*>::iterator it = m_hookList.begin();
-
- for (; it != m_hookList.end(); it++)
- {
- try
- {
- (*it)->sendMessageBefore(context);
- }
- catch (...)
- {
- RMQ_WARN("sendMessageBefore exception");
- }
- }
-}
-
-void DefaultMQProducerImpl::executeHookAfter(const SendMessageContext& context)
-{
- std::list<SendMessageHook*>::iterator it = m_hookList.begin();
-
- for (; it != m_hookList.end(); it++)
- {
- try
- {
- (*it)->sendMessageAfter(context);
- }
- catch (...)
- {
- RMQ_WARN("sendMessageAfter exception");
- }
- }
-}
-
-
-std::set<std::string> DefaultMQProducerImpl::getPublishTopicList()
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- std::set<std::string> toplist;
- std::map<std::string, TopicPublishInfo>::iterator it =
- m_topicPublishInfoTable.begin();
- for (; it != m_topicPublishInfoTable.end(); it++)
- {
- toplist.insert(it->first);
- }
-
- return toplist;
-}
-
-bool DefaultMQProducerImpl::isPublishTopicNeedUpdate(const std::string& topic)
-{
- kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- std::map<std::string, TopicPublishInfo>::iterator it =
- m_topicPublishInfoTable.find(topic);
- if (it != m_topicPublishInfoTable.end())
- {
- return !it->second.ok();
- }
-
- return true;
-}
-
-void DefaultMQProducerImpl::checkTransactionState(const std::string& addr, //
- const MessageExt& msg, //
- const CheckTransactionStateRequestHeader& checkRequestHeader)
-{
- //TODO
-}
-
-void DefaultMQProducerImpl::updateTopicPublishInfo(const std::string& topic,
- TopicPublishInfo& info)
-{
- {
- kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- std::map<std::string, TopicPublishInfo>::iterator it =
- m_topicPublishInfoTable.find(topic);
- if (it != m_topicPublishInfoTable.end())
- {
- info.getSendWhichQueue() = it->second.getSendWhichQueue();
- RMQ_INFO("updateTopicPublishInfo prev is not null, %s", it->second.toString().c_str());
- }
- m_topicPublishInfoTable[topic] = info;
- }
-}
-
-void DefaultMQProducerImpl::createTopic(const std::string& key,
- const std::string& newTopic, int queueNum)
-{
- makeSureStateOK();
- Validators::checkTopic(newTopic);
-
- m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
-}
-
-std::vector<MessageQueue>* DefaultMQProducerImpl::fetchPublishMessageQueues(
- const std::string& topic)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
-}
-
-long long DefaultMQProducerImpl::searchOffset(const MessageQueue& mq,
- long long timestamp)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
-}
-
-long long DefaultMQProducerImpl::maxOffset(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
-}
-
-long long DefaultMQProducerImpl::minOffset(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
-}
-
-long long DefaultMQProducerImpl::earliestMsgStoreTime(const MessageQueue& mq)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
-}
-
-MessageExt* DefaultMQProducerImpl::viewMessage(const std::string& msgId)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
-}
-
-QueryResult DefaultMQProducerImpl::queryMessage(const std::string& topic,
- const std::string& key, int maxNum, long long begin, long long end)
-{
- makeSureStateOK();
- return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum,
- begin, end);
-}
-
-
-/**
- * DEFAULT ASYNC -------------------------------------------------------
- */
-void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback)
-{
- send(msg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback, int timeout)
-{
- try
- {
- sendDefaultImpl(msg, ASYNC, pSendCallback, timeout);
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-
-/**
- * DEFAULT ONEWAY -------------------------------------------------------
- */
-void DefaultMQProducerImpl::sendOneway(Message& msg)
-{
- try
- {
- sendDefaultImpl(msg, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-
-/**
- * KERNEL SYNC -------------------------------------------------------
- */
-SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq)
-{
- return send(msg, mq, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, int timeout)
-{
- makeSureStateOK();
- Validators::checkMessage(msg, m_pDefaultMQProducer);
-
- if (msg.getTopic() != mq.getTopic())
- {
- THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
- }
-
- return sendKernelImpl(msg, mq, SYNC, NULL, timeout);
-}
-
-
-/**
- * KERNEL ASYNC -------------------------------------------------------
- */
-void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
- SendCallback* pSendCallback)
-{
- return send(msg, mq, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
- SendCallback* pSendCallback, int timeout)
-{
- makeSureStateOK();
- Validators::checkMessage(msg, m_pDefaultMQProducer);
-
- if (msg.getTopic() != mq.getTopic())
- {
- THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
- }
-
- try
- {
- sendKernelImpl(msg, mq, ASYNC, pSendCallback, timeout);
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-/**
- * KERNEL ONEWAY -------------------------------------------------------
- */
-void DefaultMQProducerImpl::sendOneway(Message& msg, MessageQueue& mq)
-{
- makeSureStateOK();
- Validators::checkMessage(msg, m_pDefaultMQProducer);
-
- if (msg.getTopic() != mq.getTopic())
- {
- THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
- }
-
- try
- {
- sendKernelImpl(msg, mq, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-
-/**
- * SELECT SYNC -------------------------------------------------------
- */
-SendResult DefaultMQProducerImpl::send(Message& msg,
- MessageQueueSelector* pSelector, void* arg)
-{
- return send(msg, pSelector, arg, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-SendResult DefaultMQProducerImpl::send(Message& msg,
- MessageQueueSelector* pSelector, void* arg, int timeout)
-{
- return sendSelectImpl(msg, pSelector, arg, SYNC, NULL, timeout);
-}
-
-
-/**
- * SELECT ASYNC -------------------------------------------------------
- */
-void DefaultMQProducerImpl::send(Message& msg,
- MessageQueueSelector* pSelector,
- void* arg,
- SendCallback* pSendCallback)
-{
- return send(msg, pSelector, arg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-void DefaultMQProducerImpl::send(Message& msg,
- MessageQueueSelector* pSelector,
- void* arg,
- SendCallback* pSendCallback,
- int timeout)
-{
- try
- {
- sendSelectImpl(msg, pSelector, arg, ASYNC, pSendCallback, timeout);
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-
-/**
- * SELECT ONEWAY -------------------------------------------------------
- */
-void DefaultMQProducerImpl::sendOneway(Message& msg,
- MessageQueueSelector* pSelector, void* arg)
-{
- try
- {
- sendSelectImpl(msg, pSelector, arg, ONEWAY, NULL,
- m_pDefaultMQProducer->getSendMsgTimeout());
- }
- catch (MQBrokerException& e)
- {
- THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
- }
-}
-
-
-/*
- * Send with Transaction
- */
-TransactionSendResult DefaultMQProducerImpl::sendMessageInTransaction(
- Message& msg,
- LocalTransactionExecuter* tranExecuter, void* arg)
-{
- //TODO
- TransactionSendResult result;
- return result;
-}
-
-void DefaultMQProducerImpl::endTransaction(//
- SendResult sendResult, //
- LocalTransactionState localTransactionState, //
- MQClientException localException)
-{
- //TODO
-}
-
-/**
- * DEFAULT SYNC -------------------------------------------------------
- */
-SendResult DefaultMQProducerImpl::send(Message& msg)
-{
- return send(msg, m_pDefaultMQProducer->getSendMsgTimeout());
-}
-SendResult DefaultMQProducerImpl::send(Message& msg, int timeout)
-{
- return sendDefaultImpl(msg, SYNC, NULL, timeout);
-}
-
-
-std::map<std::string, TopicPublishInfo> DefaultMQProducerImpl::getTopicPublishInfoTable()
-{
- return m_topicPublishInfoTable;
-}
-
-MQClientFactory* DefaultMQProducerImpl::getMQClientFactory()
-{
- return m_pMQClientFactory;
-}
-
-int DefaultMQProducerImpl::getZipCompressLevel()
-{
- return m_zipCompressLevel;
-}
-
-void DefaultMQProducerImpl::setZipCompressLevel(int zipCompressLevel)
-{
- m_zipCompressLevel = zipCompressLevel;
-}
-
-ServiceState DefaultMQProducerImpl::getServiceState() {
- return m_serviceState;
-}
-
-
-void DefaultMQProducerImpl::setServiceState(ServiceState serviceState) {
- m_serviceState = serviceState;
-}
-
-
-SendResult DefaultMQProducerImpl::sendDefaultImpl(Message& msg,
- CommunicationMode communicationMode,
- SendCallback* pSendCallback,
- int timeout)
-{
- makeSureStateOK();
- Validators::checkMessage(msg, m_pDefaultMQProducer);
-
- long long maxTimeout = m_pDefaultMQProducer->getSendMsgTimeout() + 1000;
- long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
- long long endTimestamp = beginTimestamp;
- TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
- SendResult sendResult;
-
- if (topicPublishInfo.ok())
- {
- MessageQueue* mq = NULL;
-
- int times = 0;
- int timesTotal = 1 + m_pDefaultMQProducer->getRetryTimesWhenSendFailed();
- std::vector<std::string> brokersSent;
- for (; times < timesTotal && int(endTimestamp - beginTimestamp) < maxTimeout; times++)
- {
- std::string lastBrokerName = (NULL == mq) ? "" : mq->getBrokerName();
- MessageQueue* tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
-
- if (tmpmq != NULL)
- {
- mq = tmpmq;
- brokersSent.push_back(mq->getBrokerName());
-
- try
- {
- sendResult = sendKernelImpl(msg, *mq, communicationMode, pSendCallback, timeout);
- endTimestamp = KPRUtil::GetCurrentTimeMillis();
-
- switch (communicationMode)
- {
- case ASYNC:
- return sendResult;
-
- case ONEWAY:
- return sendResult;
-
- case SYNC:
- if (sendResult.getSendStatus() != SEND_OK)
- {
- if (m_pDefaultMQProducer->isRetryAnotherBrokerWhenNotStoreOK())
- {
- continue;
- }
- }
-
- return sendResult;
-
- default:
- break;
- }
- }
- catch (RemotingException& e)
- {
- endTimestamp = KPRUtil::GetCurrentTimeMillis();
- continue;
- }
- catch (MQClientException& e)
- {
- endTimestamp = KPRUtil::GetCurrentTimeMillis();
- continue;
- }
- catch (MQBrokerException& e)
- {
- endTimestamp = KPRUtil::GetCurrentTimeMillis();
-
- switch (e.GetError())
- {
- case TOPIC_NOT_EXIST_VALUE:
- case SERVICE_NOT_AVAILABLE_VALUE:
- case SYSTEM_ERROR_VALUE:
- case NO_PERMISSION_VALUE:
- case NO_BUYER_ID_VALUE:
- case NOT_IN_CURRENT_UNIT_VALUE:
- continue;
- default:
- if (sendResult.hasResult())
- {
- return sendResult;
- }
- throw;
- }
- }
- catch (InterruptedException& e)
- {
- endTimestamp = KPRUtil::GetCurrentTimeMillis();
- throw;
- }
- }
- else
- {
- break;
- }
- } // end of for
-
- std::string info = RocketMQUtil::str2fmt("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times, int(endTimestamp - beginTimestamp), msg.getTopic().c_str(), UtilAll::toString(brokersSent).c_str());
- RMQ_WARN("%s", info.c_str());
- THROW_MQEXCEPTION(MQClientException, info, -1);
- return sendResult;
- }
-
- std::vector<std::string> nsList =
- getMQClientFactory()->getMQClientAPIImpl()->getNameServerAddressList();
- if (nsList.empty())
- {
- THROW_MQEXCEPTION(MQClientException, "No name server address, please set it", -1);
- }
-
- THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
-}
-
-SendResult DefaultMQProducerImpl::sendKernelImpl(Message& msg,
- const MessageQueue& mq,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- int timeout)
-{
- std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- if (brokerAddr.empty())
- {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- SendMessageContext context;
- if (!brokerAddr.empty())
- {
- try
- {
- int sysFlag = 0;
-
- if (tryToCompressMessage(msg))
- {
- sysFlag |= MessageSysFlag::CompressedFlag;
- }
-
- std::string tranMsg = msg.getProperty(Message::PROPERTY_TRANSACTION_PREPARED);
- if (!tranMsg.empty() && tranMsg == "true")
- {
- sysFlag |= MessageSysFlag::TransactionPreparedType;
- }
-
- // ִ��hook
- if (hasHook())
- {
- context.producerGroup = (m_pDefaultMQProducer->getProducerGroup());
- context.communicationMode = (communicationMode);
- context.brokerAddr = (brokerAddr);
- context.msg = (msg);
- context.mq = (mq);
- executeHookBefore(context);
- }
-
- SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
- requestHeader->producerGroup = (m_pDefaultMQProducer->getProducerGroup());
- requestHeader->topic = (msg.getTopic());
- requestHeader->defaultTopic = (m_pDefaultMQProducer->getCreateTopicKey());
- requestHeader->defaultTopicQueueNums = (m_pDefaultMQProducer->getDefaultTopicQueueNums());
- requestHeader->queueId = (mq.getQueueId());
- requestHeader->sysFlag = (sysFlag);
- requestHeader->bornTimestamp = (KPRUtil::GetCurrentTimeMillis());
- requestHeader->flag = (msg.getFlag());
- requestHeader->properties = (MessageDecoder::messageProperties2String(msg.getProperties()));
- requestHeader->reconsumeTimes = 0;
-
- if (requestHeader->topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0)
- {
- std::string reconsumeTimes = msg.getProperty(Message::PROPERTY_RECONSUME_TIME);
- if (!reconsumeTimes.empty())
- {
- requestHeader->reconsumeTimes = int(UtilAll::str2ll(reconsumeTimes.c_str()));
- msg.clearProperty(Message::PROPERTY_RECONSUME_TIME);
- }
-
- /*
- 3.5.8 new features
- std::string maxReconsumeTimes = msg.getProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
- if (!maxReconsumeTimes.empty())
- {
- requestHeader->maxReconsumeTimes = int(UtilAll::str2ll(maxReconsumeTimes.c_str()));
- msg.clearProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
- }
- */
- }
-
- SendResult sendResult = m_pMQClientFactory->getMQClientAPIImpl()->sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout,
- communicationMode,
- sendCallback
- );
-
- if (hasHook())
- {
- context.sendResult = (sendResult);
- executeHookAfter(context);
- }
-
- return sendResult;
- }
- catch (RemotingException& e)
- {
- if (hasHook())
- {
- context.pException = (&e);
- executeHookAfter(context);
- }
- RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
- throw;
- }
- catch (MQBrokerException& e)
- {
- if (hasHook())
- {
- context.pException = (&e);
- executeHookAfter(context);
- }
- RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
- throw;
- }
- catch (InterruptedException& e)
- {
- if (hasHook())
- {
- context.pException = (&e);
- executeHookAfter(context);
- }
- RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
- throw;
- }
- }
-
- THROW_MQEXCEPTION(MQClientException, std::string("The broker[") + mq.getBrokerName() + "] not exist", -1);
-}
-
-SendResult DefaultMQProducerImpl::sendSelectImpl(Message& msg,
- MessageQueueSelector* selector,
- void* pArg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- int timeout)
-{
- makeSureStateOK();
- Validators::checkMessage(msg, m_pDefaultMQProducer);
-
- SendResult result;
- TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
- SendResult sendResult;
-
- if (topicPublishInfo.ok())
- {
- MessageQueue* mq = NULL;
-
- try
- {
- mq = selector->select(topicPublishInfo.getMessageQueueList(), msg, pArg);
- }
- catch (std::exception& e)
- {
- THROW_MQEXCEPTION(MQClientException,
- std::string("select message queue throwed exception, ") + e.what(), -1);
- }
- catch (...)
- {
- THROW_MQEXCEPTION(MQClientException, "select message queue throwed exception, ", -1);
- }
-
- if (mq != NULL)
- {
- return sendKernelImpl(msg, *mq, communicationMode, sendCallback, timeout);
- }
- else
- {
- THROW_MQEXCEPTION(MQClientException, "select message queue return null", -1);
- }
- }
-
- THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
-}
-
-void DefaultMQProducerImpl::makeSureStateOK()
-{
- if (m_serviceState != RUNNING)
- {
- THROW_MQEXCEPTION(MQClientException, "The producer service state not OK, ", -1);
- }
-}
-
-void DefaultMQProducerImpl::checkConfig()
-{
- Validators::checkGroup(m_pDefaultMQProducer->getProducerGroup());
-
- if (m_pDefaultMQProducer->getProducerGroup().empty())
- {
- THROW_MQEXCEPTION(MQClientException, "producerGroup is null", -1);
- }
-
- if (m_pDefaultMQProducer->getProducerGroup() == MixAll::DEFAULT_PRODUCER_GROUP)
- {
- THROW_MQEXCEPTION(MQClientException,
- std::string("producerGroup can not equal [") + MixAll::DEFAULT_PRODUCER_GROUP + "], please specify another one",
- -1);
- }
-}
-
-TopicPublishInfo& DefaultMQProducerImpl::tryToFindTopicPublishInfo(
- const std::string& topic)
-{
- std::map<std::string, TopicPublishInfo>::iterator it;
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- it = m_topicPublishInfoTable.find(topic);
- }
-
- if (it == m_topicPublishInfoTable.end() || !it->second.ok())
- {
- {
- kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- m_topicPublishInfoTable[topic] = TopicPublishInfo();
- }
-
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic);
-
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- it = m_topicPublishInfoTable.find(topic);
- }
- }
-
- if (it != m_topicPublishInfoTable.end()
- && (it->second.ok() || it->second.isHaveTopicRouterInfo()))
- {
- return (it->second);
- }
- else
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic, true,
- m_pDefaultMQProducer);
- {
- kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
- it = m_topicPublishInfoTable.find(topic);
- }
- return (it->second);
- }
-}
-
-bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg)
-{
- if (msg.getBodyLen() >= m_pDefaultMQProducer->getCompressMsgBodyOverHowmuch())
- {
- if (msg.tryToCompress(m_pDefaultMQProducer->getCompressLevel()))
- {
- return true;
- }
- }
-
- return false;
-}
-
-TransactionCheckListener* DefaultMQProducerImpl::checkListener()
-{
- return NULL;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
deleted file mode 100755
index 3df914c..0000000
--- a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __DEFAULTMQPRODUCERIMPL_H__
-#define __DEFAULTMQPRODUCERIMPL_H__
-
-#include <list>
-#include <vector>
-#include "MQProducerInner.h"
-#include "QueryResult.h"
-#include "ServiceState.h"
-#include "CommunicationMode.h"
-#include "SendResult.h"
-#include "MQClientException.h"
-#include "Mutex.h"
-#include "ScopedLock.h"
-
-
-namespace rmq
-{
- class DefaultMQProducer;
- class SendMessageHook;
- class SendMessageContext;
- class MessageQueue;
- class MessageExt;
- class SendCallback;
- class MessageQueueSelector;
- class MQClientFactory;
- class MQClientException;
- class RemotingException;
- class MQBrokerException;
- class InterruptedException;
- class LocalTransactionExecuter;
-
-
- class DefaultMQProducerImpl : public MQProducerInner
- {
- public:
- DefaultMQProducerImpl(DefaultMQProducer* pDefaultMQProducer);
- ~DefaultMQProducerImpl();
- void initTransactionEnv();
- void destroyTransactionEnv();
-
- bool hasHook();
- void registerHook(SendMessageHook* pHook);
- void executeHookBefore(const SendMessageContext& context);
- void executeHookAfter(const SendMessageContext& context);
-
- void start();
- void start(bool startFactory);
- void shutdown();
- void shutdown(bool shutdownFactory);
-
- std::set<std::string> getPublishTopicList();
- bool isPublishTopicNeedUpdate(const std::string& topic);
-
- void checkTransactionState(const std::string& addr,
- const MessageExt& msg,
- const CheckTransactionStateRequestHeader& checkRequestHeader);
-
- void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info);
- virtual TransactionCheckListener* checkListener();
-
- void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
- std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
-
- long long searchOffset(const MessageQueue& mq, long long timestamp);
- long long maxOffset(const MessageQueue& mq);
- long long minOffset(const MessageQueue& mq);
-
- long long earliestMsgStoreTime(const MessageQueue& mq);
-
- MessageExt* viewMessage(const std::string& msgId);
- QueryResult queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end);
-
- /**
- * DEFAULT ASYNC -------------------------------------------------------
- */
- void send(Message& msg, SendCallback* sendCallback);
- void send(Message& msg, SendCallback* sendCallback, int timeout);
-
- /**
- * DEFAULT ONEWAY -------------------------------------------------------
- */
- void sendOneway(Message& msg);
-
- /**
- * KERNEL SYNC -------------------------------------------------------
- */
- SendResult send(Message& msg, MessageQueue& mq);
- SendResult send(Message& msg, MessageQueue& mq, int timeout);
-
- /**
- * KERNEL ASYNC -------------------------------------------------------
- */
- void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback);
- void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback, int timeout);
-
- /**
- * KERNEL ONEWAY -------------------------------------------------------
- */
- void sendOneway(Message& msg, MessageQueue& mq);
-
- /**
- * SELECT SYNC -------------------------------------------------------
- */
- SendResult send(Message& msg, MessageQueueSelector* selector, void* arg);
- SendResult send(Message& msg, MessageQueueSelector* selector, void* arg, int timeout);
-
- /**
- * SELECT ASYNC -------------------------------------------------------
- */
- void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback);
- void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback, int timeout);
-
- /**
- * SELECT ONEWAY -------------------------------------------------------
- */
- void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg);
-
- /**
- * SEND with Transaction
- */
- TransactionSendResult sendMessageInTransaction(Message& msg, LocalTransactionExecuter* tranExecuter, void* arg);
-
- /**
- * DEFAULT SYNC -------------------------------------------------------
- */
- SendResult send(Message& msg);
- SendResult send(Message& msg, int timeout);
-
- std::map<std::string, TopicPublishInfo> getTopicPublishInfoTable();
-
- MQClientFactory* getMQClientFactory();
-
- int getZipCompressLevel();
- void setZipCompressLevel(int zipCompressLevel);
-
- ServiceState getServiceState();
- void setServiceState(ServiceState serviceState);
-
- private:
- SendResult sendSelectImpl(Message& msg,
- MessageQueueSelector* selector,
- void* pArg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- int timeout);
-
- SendResult sendDefaultImpl(Message& msg,
- CommunicationMode communicationMode,
- SendCallback* pSendCallback,
- int timeout);
-
- SendResult sendKernelImpl(Message& msg,
- const MessageQueue& mq,
- CommunicationMode communicationMode,
- SendCallback* pSendCallback,
- int timeout);
-
- void endTransaction(SendResult sendResult,
- LocalTransactionState localTransactionState,
- MQClientException localException);
-
- void makeSureStateOK();
- void checkConfig();
-
- TopicPublishInfo& tryToFindTopicPublishInfo(const std::string& topic) ;
- bool tryToCompressMessage(Message& msg);
-
- protected:
- //TODO transaction imp
-
- private:
- int m_zipCompressLevel;// message compress level, default is 5
-
- DefaultMQProducer* m_pDefaultMQProducer;
-
- std::map<std::string, TopicPublishInfo> m_topicPublishInfoTable;
- kpr::RWMutex m_topicPublishInfoTableLock;
-
- ServiceState m_serviceState;
- MQClientFactory* m_pMQClientFactory;
- std::list<SendMessageHook*> m_hookList;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
deleted file mode 100755
index a124884..0000000
--- a/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __LOCALTRANSACTIONEXECUTER_H__
-#define __LOCALTRANSACTIONEXECUTER_H__
-
-#include "SendResult.h"
-
-namespace rmq
-{
- class LocalTransactionExecuter
- {
- public:
- virtual~LocalTransactionExecuter() {}
- virtual LocalTransactionState executeLocalTransactionBranch(Message& msg, void* arg) = 0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/MQProducerInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MQProducerInner.h b/rocketmq-client4cpp/src/producer/MQProducerInner.h
deleted file mode 100755
index 56194dc..0000000
--- a/rocketmq-client4cpp/src/producer/MQProducerInner.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __MQPRODUCERINNER_H__
-#define __MQPRODUCERINNER_H__
-
-#include <string>
-#include <set>
-
-namespace rmq
-{
- class TransactionCheckListener;
- class MessageExt;
- class CheckTransactionStateRequestHeader;
- class TopicPublishInfo;
-
- class MQProducerInner
- {
- public:
- virtual ~MQProducerInner() {}
- virtual std::set<std::string> getPublishTopicList() = 0;
- virtual bool isPublishTopicNeedUpdate(const std::string& topic) = 0;
- virtual TransactionCheckListener* checkListener() = 0;
- virtual void checkTransactionState(const std::string& addr, //
- const MessageExt& msg, //
- const CheckTransactionStateRequestHeader& checkRequestHeader) = 0;
- virtual void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info) = 0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MessageQueueSelector.h b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
deleted file mode 100755
index 6d5ac48..0000000
--- a/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __MESSAGEQUEUESELECTOR_H__
-#define __MESSAGEQUEUESELECTOR_H__
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <time.h>
-#include <math.h>
-#include <set>
-#include <string>
-#include <vector>
-
-#include "MessageQueue.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
- class Message;
-
- class MessageQueueSelector
- {
- public:
- virtual ~MessageQueueSelector() {}
- virtual MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) = 0;
- };
-
- class SelectMessageQueueByRandoom : public MessageQueueSelector
- {
- public:
- MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
- {
- srand((unsigned)time(NULL));
- int value = rand();
- value = value % mqs.size();
- return &(mqs.at(value));
- }
- };
-
- class SelectMessageQueueByHash : public MessageQueueSelector
- {
- public:
- MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
- {
- std::string* sArg = (std::string*)arg;
- int value = UtilAll::hashCode(sArg->c_str(), sArg->size());
- if (value < 0)
- {
- value = abs(value);
- }
-
- value = value % mqs.size();
- return &(mqs.at(value));
- }
- };
-
-
- class SelectMessageQueueByMachineRoom : public MessageQueueSelector
- {
- public:
- MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
- {
- // TODO Auto-generated method stub
- return NULL;
- }
-
- std::set<std::string> getConsumeridcs()
- {
- return m_consumeridcs;
- }
-
- void setConsumeridcs(const std::set<std::string>& consumeridcs)
- {
- m_consumeridcs = consumeridcs;
- }
-
- private:
- std::set<std::string> m_consumeridcs;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
deleted file mode 100755
index 573db95..0000000
--- a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ProducerInvokeCallback.h"
-#include "ResponseFuture.h"
-#include "SendResult.h"
-#include "MQClientAPIImpl.h"
-#include "SendCallback.h"
-#include "MQClientException.h"
-#include "RemotingCommand.h"
-
-namespace rmq
-{
-
-ProducerInvokeCallback::ProducerInvokeCallback(SendCallback* pSendCallBack,
- MQClientAPIImpl* pMQClientAPIImpl,
- const std::string& topic,
- const std::string& brokerName)
- : m_pSendCallBack(pSendCallBack),
- m_pMQClientAPIImpl(pMQClientAPIImpl),
- m_topic(topic),
- m_brokerName(brokerName)
-{
-}
-
-ProducerInvokeCallback::~ProducerInvokeCallback()
-{
- if (m_pSendCallBack)
- {
- delete m_pSendCallBack;
- m_pSendCallBack = NULL;
- }
-}
-
-void ProducerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
-{
- if (m_pSendCallBack == NULL)
- {
- delete this;
- return;
- }
-
- RemotingCommand* response = pResponseFuture->getResponseCommand();
- if (response != NULL)
- {
- try
- {
- SendResult* sendResult =
- m_pMQClientAPIImpl->processSendResponse(m_brokerName, m_topic, response);
-
- assert(sendResult != NULL);
- m_pSendCallBack->onSuccess(*sendResult);
-
- delete sendResult;
- }
- catch (MQException& e)
- {
- m_pSendCallBack->onException(e);
- }
-
- delete response;
- }
- else
- {
- if (!pResponseFuture->isSendRequestOK())
- {
- std::string msg = "send request failed";
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pSendCallBack->onException(e);
- }
- else if (pResponseFuture->isTimeout())
- {
- std::string msg = RocketMQUtil::str2fmt("wait response timeout %lld ms",
- pResponseFuture->getTimeoutMillis());
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pSendCallBack->onException(e);
- }
- else
- {
- std::string msg = "unknow reseaon";
- MQClientException e(msg, -1, __FILE__, __LINE__);
- m_pSendCallBack->onException(e);
- }
- }
-
- delete this;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
deleted file mode 100755
index d2c9825..0000000
--- a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __PRODUCERINVOKECALLBACK_H__
-#define __PRODUCERINVOKECALLBACK_H__
-
-#include <string>
-#include "InvokeCallback.h"
-
-namespace rmq
-{
- class MQClientAPIImpl;
- class SendCallback;
-
- class ProducerInvokeCallback : public InvokeCallback
- {
- public:
- ProducerInvokeCallback(SendCallback* pSendCallBack,
- MQClientAPIImpl* pMQClientAPIImpl,
- const std::string& topic,
- const std::string& brokerName);
- virtual ~ProducerInvokeCallback();
- virtual void operationComplete(ResponseFuturePtr pResponseFuture);
-
- private:
- SendCallback* m_pSendCallBack;
- MQClientAPIImpl* m_pMQClientAPIImpl;
- std::string m_topic;
- std::string m_brokerName;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TopicPublishInfo.h b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
deleted file mode 100755
index 0d85b5f..0000000
--- a/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __TOPICPUBLISHINFO_H__
-#define __TOPICPUBLISHINFO_H__
-
-#include <list>
-#include <vector>
-#include <string>
-#include <sstream>
-#include <math.h>
-#include <stdlib.h>
-
-#include "RocketMQClient.h"
-#include "RefHandle.h"
-#include "MessageQueue.h"
-#include "AtomicValue.h"
-#include "UtilAll.h"
-
-
-namespace rmq
-{
- class TopicPublishInfo : public kpr::RefCount
- {
- public:
- TopicPublishInfo()
- {
- m_orderTopic = false;
- m_haveTopicRouterInfo = false;
- }
-
- ~TopicPublishInfo()
- {
- m_messageQueueList.clear();
- }
-
- bool isOrderTopic()
- {
- return m_orderTopic;
- }
-
- bool ok()
- {
- return !m_messageQueueList.empty();
- }
-
- void setOrderTopic(bool orderTopic)
- {
- m_orderTopic = orderTopic;
- }
-
- std::vector<MessageQueue>& getMessageQueueList()
- {
- return m_messageQueueList;
- }
-
- void setMessageQueueList(const std::vector<MessageQueue>& messageQueueList)
- {
- m_messageQueueList = messageQueueList;
- }
-
- kpr::AtomicInteger& getSendWhichQueue()
- {
- return m_sendWhichQueue;
- }
-
- void setSendWhichQueue(kpr::AtomicInteger& sendWhichQueue)
- {
- m_sendWhichQueue = sendWhichQueue;
- }
-
- bool isHaveTopicRouterInfo()
- {
- return m_haveTopicRouterInfo;
- }
-
-
- void setHaveTopicRouterInfo(bool haveTopicRouterInfo)
- {
- m_haveTopicRouterInfo = haveTopicRouterInfo;
- }
-
- MessageQueue* selectOneMessageQueue(const std::string lastBrokerName)
- {
- if (!lastBrokerName.empty())
- {
- int index = m_sendWhichQueue++;
- for (size_t i = 0; i < m_messageQueueList.size(); i++)
- {
- int pos = abs(index++) % m_messageQueueList.size();
- MessageQueue& mq = m_messageQueueList.at(pos);
- if (mq.getBrokerName() != lastBrokerName)
- {
- return &mq;
- }
- }
-
- return NULL;
- }
- else
- {
- int index = m_sendWhichQueue++;
- int pos = abs(index) % m_messageQueueList.size();
- return &(m_messageQueueList.at(pos));
- }
- }
-
- std::string toString() const
- {
- std::stringstream ss;
- ss << "{orderTopic=" << m_orderTopic
- << ",messageQueueList=" << UtilAll::toString(m_messageQueueList)
- << ",sendWhichQueue=" << m_sendWhichQueue
- << ",haveTopicRouterInfo=" << m_haveTopicRouterInfo
- << "}";
- return ss.str();
- }
-
- private:
- bool m_orderTopic;
- std::vector<MessageQueue> m_messageQueueList;
- kpr::AtomicInteger m_sendWhichQueue;
- bool m_haveTopicRouterInfo;
- };
- typedef kpr::RefHandleT<TopicPublishInfo> TopicPublishInfoPtr;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionCheckListener.h b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
deleted file mode 100755
index 8955742..0000000
--- a/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TRANSACTIONCHECKLISTENER_H__
-#define __TRANSACTIONCHECKLISTENER_H__
-
-#include "SendResult.h"
-
-namespace rmq
-{
- class TransactionCheckListener
- {
- public:
- virtual ~TransactionCheckListener() {}
- virtual LocalTransactionState checkLocalTransactionState(MessageExt* pMsg) = 0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionMQProducer.h b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
deleted file mode 100755
index bee11a5..0000000
--- a/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __TRANSACTIONMQPRODUCER_H__
-#define __TRANSACTIONMQPRODUCER_H__
-
-#include "DefaultMQProducer.h"
-#include "DefaultMQProducerImpl.h"
-#include "MQClientException.h"
-
-namespace rmq
-{
- class TransactionMQProducer : public DefaultMQProducer
- {
- public:
- TransactionMQProducer()
- : m_pTransactionCheckListener(NULL),
- m_checkThreadPoolMinSize(1),
- m_checkThreadPoolMaxSize(1),
- m_checkRequestHoldMax(2000)
- {
-
- }
-
- TransactionMQProducer(const std::string& producerGroup)
- : DefaultMQProducer(producerGroup),
- m_pTransactionCheckListener(NULL),
- m_checkThreadPoolMinSize(1),
- m_checkThreadPoolMaxSize(1),
- m_checkRequestHoldMax(2000)
- {
-
- }
-
- void start()
- {
- m_pDefaultMQProducerImpl->initTransactionEnv();
- DefaultMQProducer::start();
- }
-
- void shutdown()
- {
- DefaultMQProducer::shutdown();
- m_pDefaultMQProducerImpl->destroyTransactionEnv();
- }
-
- TransactionSendResult sendMessageInTransaction(const Message& msg,
- LocalTransactionExecuter* tranExecuter, void* arg)
- {
- if (NULL == m_pTransactionCheckListener)
- {
- THROW_MQEXCEPTION("localTransactionBranchCheckListener is null", -1);
- }
-
- return m_pDefaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
- }
-
- TransactionCheckListener* getTransactionCheckListener()
- {
- return m_pTransactionCheckListener;
- }
-
- void setTransactionCheckListener(TransactionCheckListener* pTransactionCheckListener)
- {
- m_pTransactionCheckListener = pTransactionCheckListener;
- }
-
- int getCheckThreadPoolMinSize()
- {
- return m_checkThreadPoolMinSize;
- }
-
- void setCheckThreadPoolMinSize(int checkThreadPoolMinSize)
- {
- m_checkThreadPoolMinSize = checkThreadPoolMinSize;
- }
-
- int getCheckThreadPoolMaxSize()
- {
- return m_checkThreadPoolMaxSize;
- }
-
- void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize)
- {
- m_checkThreadPoolMaxSize = checkThreadPoolMaxSize;
- }
-
- int getCheckRequestHoldMax()
- {
- return m_checkRequestHoldMax;
- }
-
- void setCheckRequestHoldMax(int checkRequestHoldMax)
- {
- m_checkRequestHoldMax = checkRequestHoldMax;
- }
-
- private:
- TransactionCheckListener* m_pTransactionCheckListener;
- int m_checkThreadPoolMinSize;
- int m_checkThreadPoolMaxSize;
- int m_checkRequestHoldMax;
- };
-}
-
-#endif