You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:49 UTC
[03/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MQProtos.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MQProtos.h b/rocketmq-cpp/src/protocol/MQProtos.h
new file mode 100755
index 0000000..50c1841
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MQProtos.h
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQPROTOS_H__
+#define __MQPROTOS_H__
+
+namespace rocketmq {
+//<!***************************************************************************
+enum MQRequestCode {
+ // send msg to Broker
+ SEND_MESSAGE = 10,
+ // subscribe msg from Broker
+ PULL_MESSAGE = 11,
+ // query msg from Broker
+ QUERY_MESSAGE = 12,
+ // query Broker Offset
+ QUERY_BROKER_OFFSET = 13,
+ // query Consumer Offset from broker
+ QUERY_CONSUMER_OFFSET = 14,
+ // update Consumer Offset to broker
+ UPDATE_CONSUMER_OFFSET = 15,
+ // create or update Topic to broker
+ UPDATE_AND_CREATE_TOPIC = 17,
+ // get all topic config info from broker
+ GET_ALL_TOPIC_CONFIG = 21,
+ //git all topic list from broker
+ GET_TOPIC_CONFIG_LIST = 22,
+ //get topic name list from broker
+ GET_TOPIC_NAME_LIST = 23,
+ UPDATE_BROKER_CONFIG = 25,
+ GET_BROKER_CONFIG = 26,
+ TRIGGER_DELETE_FILES = 27,
+ GET_BROKER_RUNTIME_INFO = 28,
+ SEARCH_OFFSET_BY_TIMESTAMP = 29,
+ GET_MAX_OFFSET = 30,
+ GET_MIN_OFFSET = 31,
+ GET_EARLIEST_MSG_STORETIME = 32,
+ VIEW_MESSAGE_BY_ID = 33,
+ //send heartbeat to broker, and register itself
+ HEART_BEAT = 34,
+ //unregister client to broker
+ UNREGISTER_CLIENT = 35,
+ //send back consume fail msg to broker
+ CONSUMER_SEND_MSG_BACK = 36,
+ //Commit Or Rollback transaction
+ END_TRANSACTION = 37,
+ // get consumer list by group from broker
+ GET_CONSUMER_LIST_BY_GROUP = 38,
+
+ CHECK_TRANSACTION_STATE = 39,
+ //broker send notify to consumer when consumer lists changes
+ NOTIFY_CONSUMER_IDS_CHANGED = 40,
+ //lock mq before orderly consume
+ LOCK_BATCH_MQ = 41,
+ //unlock mq after orderly consume
+ UNLOCK_BATCH_MQ = 42,
+ GET_ALL_CONSUMER_OFFSET = 43,
+ GET_ALL_DELAY_OFFSET = 45,
+ PUT_KV_CONFIG = 100,
+ GET_KV_CONFIG = 101,
+ DELETE_KV_CONFIG = 102,
+ REGISTER_BROKER = 103,
+ UNREGISTER_BROKER = 104,
+ GET_ROUTEINTO_BY_TOPIC = 105,
+ GET_BROKER_CLUSTER_INFO = 106,
+ UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200,
+ GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201,
+ GET_TOPIC_STATS_INFO = 202,
+ GET_CONSUMER_CONNECTION_LIST = 203,
+ GET_PRODUCER_CONNECTION_LIST = 204,
+ WIPE_WRITE_PERM_OF_BROKER = 205,
+
+ GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206,
+ DELETE_SUBSCRIPTIONGROUP = 207,
+ GET_CONSUME_STATS = 208,
+ SUSPEND_CONSUMER = 209,
+ RESUME_CONSUMER = 210,
+ RESET_CONSUMER_OFFSET_IN_CONSUMER = 211,
+ RESET_CONSUMER_OFFSET_IN_BROKER = 212,
+ ADJUST_CONSUMER_THREAD_POOL = 213,
+ WHO_CONSUME_THE_MESSAGE = 214,
+
+ DELETE_TOPIC_IN_BROKER = 215,
+ DELETE_TOPIC_IN_NAMESRV = 216,
+
+ GET_KV_CONFIG_BY_VALUE = 217,
+
+ DELETE_KV_CONFIG_BY_VALUE = 218,
+
+ GET_KVLIST_BY_NAMESPACE = 219,
+
+
+ RESET_CONSUMER_CLIENT_OFFSET = 220,
+
+ GET_CONSUMER_STATUS_FROM_CLIENT = 221,
+
+ INVOKE_BROKER_TO_RESET_OFFSET = 222,
+
+ INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223,
+
+ QUERY_TOPIC_CONSUME_BY_WHO = 300,
+
+ GET_TOPICS_BY_CLUSTER = 224,
+
+ REGISTER_FILTER_SERVER = 301,
+
+ REGISTER_MESSAGE_FILTER_CLASS = 302,
+
+ QUERY_CONSUME_TIME_SPAN = 303,
+
+ GET_SYSTEM_TOPIC_LIST_FROM_NS = 304,
+ GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305,
+
+ CLEAN_EXPIRED_CONSUMEQUEUE = 306,
+
+ GET_CONSUMER_RUNNING_INFO = 307,
+
+ QUERY_CORRECTION_OFFSET = 308,
+
+ CONSUME_MESSAGE_DIRECTLY = 309,
+
+ SEND_MESSAGE_V2 = 310,
+
+ GET_UNIT_TOPIC_LIST = 311,
+ GET_HAS_UNIT_SUB_TOPIC_LIST = 312,
+ GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313,
+
+ CLONE_GROUP_OFFSET = 314,
+
+ VIEW_BROKER_STATS_DATA = 315
+};
+
+//<!***************************************************************************
+enum MQResponseCode {
+ //rcv success response from broker
+ SUCCESS_VALUE = 0,
+ //rcv exception from broker
+ SYSTEM_ERROR = 1,
+ //rcv symtem busy from broker
+ SYSTEM_BUSY = 2,
+ //broker don't support the request code
+ REQUEST_CODE_NOT_SUPPORTED = 3,
+ //broker flush disk timeout error
+ FLUSH_DISK_TIMEOUT = 10,
+ //broker sync double write, slave broker not available
+ SLAVE_NOT_AVAILABLE = 11,
+ //broker sync double write, slave broker flush msg timeout
+ FLUSH_SLAVE_TIMEOUT = 12,
+ //broker rcv illegal mesage
+ MESSAGE_ILLEGAL = 13,
+ //service not available due to broker or namesrv in shutdown status
+ SERVICE_NOT_AVAILABLE = 14,
+ //this version is not supported on broker or namesrv
+ VERSION_NOT_SUPPORTED = 15,
+ //broker or Namesrv has no permission to do this operation
+ NO_PERMISSION = 16,
+ //topic is not exist on broker
+ TOPIC_NOT_EXIST = 17,
+ //broker already created this topic
+ TOPIC_EXIST_ALREADY = 18,
+ //pulled msg was not found
+ PULL_NOT_FOUND = 19,
+ //retry later
+ PULL_RETRY_IMMEDIATELY = 20,
+ //pull msg with wrong offset
+ PULL_OFFSET_MOVED = 21,
+ //could not find the query msg
+ QUERY_NOT_FOUND = 22,
+
+ SUBSCRIPTION_PARSE_FAILED = 23,
+ SUBSCRIPTION_NOT_EXIST = 24,
+ SUBSCRIPTION_NOT_LATEST = 25,
+ SUBSCRIPTION_GROUP_NOT_EXIST = 26,
+
+ TRANSACTION_SHOULD_COMMIT = 200,
+ TRANSACTION_SHOULD_ROLLBACK = 201,
+ TRANSACTION_STATE_UNKNOW = 202,
+ TRANSACTION_STATE_GROUP_WRONG = 203,
+
+ CONSUMER_NOT_ONLINE = 206,
+ CONSUME_MSG_TIMEOUT = 207
+};
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MessageQueue.cpp b/rocketmq-cpp/src/protocol/MessageQueue.cpp
new file mode 100755
index 0000000..f1b3f8f
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MessageQueue.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageQueue.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MessageQueue::MessageQueue() {
+ m_queueId = -1; // invalide mq
+ m_topic.clear();
+ m_brokerName.clear();
+}
+
+MessageQueue::MessageQueue(const string& topic, const string& brokerName,
+ int queueId)
+ : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) {}
+
+MessageQueue::MessageQueue(const MessageQueue& other)
+ : m_topic(other.m_topic),
+ m_brokerName(other.m_brokerName),
+ m_queueId(other.m_queueId) {}
+
+MessageQueue& MessageQueue::operator=(const MessageQueue& other) {
+ if (this != &other) {
+ m_brokerName = other.m_brokerName;
+ m_topic = other.m_topic;
+ m_queueId = other.m_queueId;
+ }
+ return *this;
+}
+
+string MessageQueue::getTopic() const { return m_topic; }
+
+void MessageQueue::setTopic(const string& topic) { m_topic = topic; }
+
+string MessageQueue::getBrokerName() const { return m_brokerName; }
+
+void MessageQueue::setBrokerName(const string& brokerName) {
+ m_brokerName = brokerName;
+}
+
+int MessageQueue::getQueueId() const { return m_queueId; }
+
+void MessageQueue::setQueueId(int queueId) { m_queueId = queueId; }
+
+bool MessageQueue::operator==(const MessageQueue& mq) const {
+ if (this == &mq) {
+ return true;
+ }
+
+ if (m_brokerName != mq.m_brokerName) {
+ return false;
+ }
+
+ if (m_queueId != mq.m_queueId) {
+ return false;
+ }
+
+ if (m_topic != mq.m_topic) {
+ return false;
+ }
+
+ return true;
+}
+
+int MessageQueue::compareTo(const MessageQueue& mq) const {
+ int result = m_topic.compare(mq.m_topic);
+ if (result != 0) {
+ return result;
+ }
+
+ result = m_brokerName.compare(mq.m_brokerName);
+ if (result != 0) {
+ return result;
+ }
+
+ return m_queueId - mq.m_queueId;
+}
+
+bool MessageQueue::operator<(const MessageQueue& mq) const {
+ return compareTo(mq) < 0;
+}
+
+Json::Value MessageQueue::toJson() const {
+ Json::Value outJson;
+ outJson["topic"] = m_topic;
+ outJson["brokerName"] = m_brokerName;
+ outJson["queueId"] = m_queueId;
+ return outJson;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MessageQueue.h b/rocketmq-cpp/src/protocol/MessageQueue.h
new file mode 100755
index 0000000..0d47bf8
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MessageQueue.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEQUEUE_H__
+#define __MESSAGEQUEUE_H__
+
+#include <string>
+#include "json/json.h"
+
+namespace rocketmq {
+//<!************************************************************************/
+//<!* MQ(T,B,ID);
+//<!************************************************************************/
+class MessageQueue {
+ public:
+ MessageQueue();
+ MessageQueue(const std::string& topic, const std::string& brokerName,
+ int queueId);
+ MessageQueue(const MessageQueue& other);
+ MessageQueue& operator=(const MessageQueue& other);
+
+ std::string getTopic() const;
+ void setTopic(const std::string& topic);
+
+ std::string getBrokerName() const;
+ void setBrokerName(const std::string& brokerName);
+
+ int getQueueId() const;
+ void setQueueId(int queueId);
+
+ bool operator==(const MessageQueue& mq) const;
+ bool operator<(const MessageQueue& mq) const;
+ int compareTo(const MessageQueue& mq) const;
+ Json::Value toJson() const;
+
+ private:
+ std::string m_topic;
+ std::string m_brokerName;
+ int m_queueId;
+};
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ProcessQueueInfo.h b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
new file mode 100644
index 0000000..d7493a5
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
@@ -0,0 +1,86 @@
+#ifndef __PROCESSQUEUEINFO_H__
+#define __PROCESSQUEUEINFO_H__
+
+#include "UtilAll.h"
+#include "json/json.h"
+
+namespace rocketmq {
+class ProcessQueueInfo {
+ public:
+ ProcessQueueInfo() {
+ commitOffset = 0;
+ cachedMsgMinOffset = 0;
+ cachedMsgMaxOffset = 0;
+ cachedMsgCount = 0;
+ transactionMsgMinOffset = 0;
+ transactionMsgMaxOffset = 0;
+ transactionMsgCount = 0;
+ locked = false;
+ tryUnlockTimes = 0;
+ lastLockTimestamp = 123;
+ droped = false;
+ lastPullTimestamp = 0;
+ lastConsumeTimestamp = 0;
+ }
+ virtual ~ProcessQueueInfo() {}
+
+ public:
+ const uint64 getCommitOffset() const { return commitOffset; }
+
+ void setCommitOffset(uint64 input_commitOffset) {
+ commitOffset = input_commitOffset;
+ }
+
+ void setLocked(bool in_locked) { locked = in_locked; }
+
+ const bool isLocked() const { return locked; }
+
+ void setDroped(bool in_dropped) { droped = in_dropped; }
+
+ const bool isDroped() const { return droped; }
+
+ Json::Value toJson() const {
+ Json::Value outJson;
+ outJson["commitOffset"] = (UtilAll::to_string(commitOffset)).c_str();
+ outJson["cachedMsgMinOffset"] =
+ (UtilAll::to_string(cachedMsgMinOffset)).c_str();
+ outJson["cachedMsgMaxOffset"] =
+ (UtilAll::to_string(cachedMsgMaxOffset)).c_str();
+ outJson["cachedMsgCount"] = (int)(cachedMsgCount);
+ outJson["transactionMsgMinOffset"] =
+ (UtilAll::to_string(transactionMsgMinOffset)).c_str();
+ outJson["transactionMsgMaxOffset"] =
+ (UtilAll::to_string(transactionMsgMaxOffset)).c_str();
+ outJson["transactionMsgCount"] = (int)(transactionMsgCount);
+ outJson["locked"] = (locked);
+ outJson["tryUnlockTimes"] = (int)(tryUnlockTimes);
+ outJson["lastLockTimestamp"] =
+ (UtilAll::to_string(lastLockTimestamp)).c_str();
+ outJson["droped"] = (droped);
+ outJson["lastPullTimestamp"] =
+ (UtilAll::to_string(lastPullTimestamp)).c_str();
+ outJson["lastConsumeTimestamp"] =
+ (UtilAll::to_string(lastConsumeTimestamp)).c_str();
+
+ return outJson;
+ }
+
+ public:
+ uint64 commitOffset;
+ uint64 cachedMsgMinOffset;
+ uint64 cachedMsgMaxOffset;
+ int cachedMsgCount;
+ uint64 transactionMsgMinOffset;
+ uint64 transactionMsgMaxOffset;
+ int transactionMsgCount;
+ bool locked;
+ int tryUnlockTimes;
+ uint64 lastLockTimestamp;
+
+ bool droped;
+ uint64 lastPullTimestamp;
+ uint64 lastConsumeTimestamp;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.cpp b/rocketmq-cpp/src/protocol/RemotingCommand.cpp
new file mode 100644
index 0000000..ff6e53e
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingCommand.cpp
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RemotingCommand.h"
+#include "ByteOrder.h"
+#include "Logging.h"
+#include "MQProtos.h"
+#include "MQVersion.h"
+#include "SessionCredentials.h"
+
+namespace rocketmq {
+boost::atomic<int> RemotingCommand::s_seqNumber;
+boost::mutex RemotingCommand::m_clock;
+//<!************************************************************************
+RemotingCommand::RemotingCommand(int code,
+ CommandHeader* pExtHeader /* = NULL */)
+ : m_code(code),
+ m_language("CPP"),
+ m_version(MQVersion::s_CurrentVersion),
+ m_flag(0),
+ m_remark(""),
+ m_pExtHeader(pExtHeader) {
+ boost::lock_guard<boost::mutex> lock(m_clock);
+ m_opaque = (s_seqNumber.load(boost::memory_order_acquire)) %
+ (numeric_limits<int>::max());
+ s_seqNumber.store(m_opaque, boost::memory_order_release);
+ ++s_seqNumber;
+}
+
+RemotingCommand::RemotingCommand(int code, string language, int version,
+ int opaque, int flag, string remark,
+ CommandHeader* pExtHeader)
+ : m_code(code),
+ m_language(language),
+ m_version(version),
+ m_opaque(opaque),
+ m_flag(flag),
+ m_remark(remark),
+ m_pExtHeader(pExtHeader) {}
+
+RemotingCommand::~RemotingCommand() { m_pExtHeader = NULL; }
+
+void RemotingCommand::Encode() {
+ Json::Value root;
+ root["code"] = m_code;
+ root["language"] = "CPP";
+ root["version"] = m_version;
+ root["opaque"] = m_opaque;
+ root["flag"] = m_flag;
+ root["remark"] = m_remark;
+
+ if (m_pExtHeader) {
+ Json::Value extJson;
+ m_pExtHeader->Encode(extJson);
+
+ extJson[SessionCredentials::Signature] =
+ m_extFields[SessionCredentials::Signature];
+ extJson[SessionCredentials::AccessKey] =
+ m_extFields[SessionCredentials::AccessKey];
+ extJson[SessionCredentials::ONSChannelKey] =
+ m_extFields[SessionCredentials::ONSChannelKey];
+
+ root["extFields"] = extJson;
+ } else { // for heartbeat
+ Json::Value extJson;
+ extJson[SessionCredentials::Signature] =
+ m_extFields[SessionCredentials::Signature];
+ extJson[SessionCredentials::AccessKey] =
+ m_extFields[SessionCredentials::AccessKey];
+ extJson[SessionCredentials::ONSChannelKey] =
+ m_extFields[SessionCredentials::ONSChannelKey];
+ root["extFields"] = extJson;
+ }
+
+ Json::FastWriter fastwrite;
+ string data = fastwrite.write(root);
+
+ uint32 headLen = data.size();
+ uint32 totalLen = 4 + headLen + m_body.getSize();
+
+ uint32 messageHeader[2];
+ messageHeader[0] = ByteOrder::swapIfLittleEndian(totalLen);
+ messageHeader[1] = ByteOrder::swapIfLittleEndian(headLen);
+
+ //<!include self 4 bytes, see : doc/protocol.txt;
+ m_head.setSize(4 + 4 + headLen);
+ m_head.copyFrom(messageHeader, 0, sizeof(messageHeader));
+ m_head.copyFrom(data.c_str(), sizeof(messageHeader), headLen);
+}
+
+const MemoryBlock* RemotingCommand::GetHead() const { return &m_head; }
+
+const MemoryBlock* RemotingCommand::GetBody() const { return &m_body; }
+
+void RemotingCommand::SetBody(const char* pData, int len) {
+ m_body.reset();
+ m_body.setSize(len);
+ m_body.copyFrom(pData, 0, len);
+}
+
+RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) {
+ //<!decode 1 bytes,4+head+body
+ uint32 messageHeader[1];
+ mem.copyTo(messageHeader, 0, sizeof(messageHeader));
+ int totalLen = mem.getSize();
+ int headLen = ByteOrder::swapIfLittleEndian(messageHeader[0]);
+ int bodyLen = totalLen - 4 - headLen;
+
+ //<!decode header;
+ const char* const pData = static_cast<const char*>(mem.getData());
+ Json::Reader reader;
+ Json::Value object;
+ const char* begin = pData + 4;
+ const char* end = pData + 4 + headLen;
+
+ if (!reader.parse(begin, end, object)) {
+ THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1);
+ }
+
+ int code = object["code"].asInt();
+
+ string language = object["language"].asString();
+ int version = object["version"].asInt();
+ int opaque = object["opaque"].asInt();
+ int flag = object["flag"].asInt();
+ Json::Value v = object["remark"];
+ string remark = "";
+ if (!v.isNull()) {
+ remark = object["remark"].asString();
+ }
+ LOG_DEBUG("code:%d, remark:%s, version:%d, opaque:%d, flag:%d, remark:%s, headLen:%d, bodyLen:%d ",
+ code, language.c_str(), version, opaque, flag, remark.c_str(), headLen, bodyLen);
+ RemotingCommand* cmd =
+ new RemotingCommand(code, language, version, opaque, flag, remark, NULL);
+ cmd->setParsedJson(object);
+ if (bodyLen > 0) {
+ cmd->SetBody(pData + 4 + headLen, bodyLen);
+ }
+ return cmd;
+}
+
+void RemotingCommand::markResponseType() {
+ int bits = 1 << RPC_TYPE;
+ m_flag |= bits;
+}
+
+bool RemotingCommand::isResponseType() {
+ int bits = 1 << RPC_TYPE;
+ return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::markOnewayRPC() {
+ int bits = 1 << RPC_ONEWAY;
+ m_flag |= bits;
+}
+
+bool RemotingCommand::isOnewayRPC() {
+ int bits = 1 << RPC_ONEWAY;
+ return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::setOpaque(const int opa) { m_opaque = opa; }
+
+void RemotingCommand::SetExtHeader(int code) {
+ try {
+ Json::Value ext = m_parsedJson["extFields"];
+ if (!ext.isNull()) {
+ m_pExtHeader = NULL;
+ switch (code) {
+ case SEND_MESSAGE:
+ m_pExtHeader.reset(SendMessageResponseHeader::Decode(ext));
+ break;
+ case PULL_MESSAGE:
+ m_pExtHeader.reset(PullMessageResponseHeader::Decode(ext));
+ break;
+ case GET_MIN_OFFSET:
+ m_pExtHeader.reset(GetMinOffsetResponseHeader::Decode(ext));
+ break;
+ case GET_MAX_OFFSET:
+ m_pExtHeader.reset(GetMaxOffsetResponseHeader::Decode(ext));
+ break;
+ case SEARCH_OFFSET_BY_TIMESTAMP:
+ m_pExtHeader.reset(SearchOffsetResponseHeader::Decode(ext));
+ break;
+ case GET_EARLIEST_MSG_STORETIME:
+ m_pExtHeader.reset(
+ GetEarliestMsgStoretimeResponseHeader::Decode(ext));
+ break;
+ case QUERY_CONSUMER_OFFSET:
+ m_pExtHeader.reset(QueryConsumerOffsetResponseHeader::Decode(ext));
+ break;
+ case RESET_CONSUMER_CLIENT_OFFSET:
+ m_pExtHeader.reset(ResetOffsetRequestHeader::Decode(ext));
+ break;
+ case GET_CONSUMER_RUNNING_INFO:
+ m_pExtHeader.reset(GetConsumerRunningInfoRequestHeader::Decode(ext));
+ break;
+ case NOTIFY_CONSUMER_IDS_CHANGED:
+ m_pExtHeader.reset(
+ NotifyConsumerIdsChangedRequestHeader::Decode(ext));
+ default:
+ break;
+ }
+ }
+ } catch (MQException& e) {
+ LOG_ERROR("set response head error");
+ }
+}
+
+void RemotingCommand::setCode(int code) { m_code = code; }
+
+int RemotingCommand::getCode() const { return m_code; }
+
+int RemotingCommand::getOpaque() const { return m_opaque; }
+
+string RemotingCommand::getRemark() const { return m_remark; }
+
+void RemotingCommand::setRemark(string mark) { m_remark = mark; }
+
+CommandHeader* RemotingCommand::getCommandHeader() const {
+ return m_pExtHeader.get();
+}
+
+void RemotingCommand::setParsedJson(Json::Value json) {
+ m_parsedJson = json;
+}
+
+const int RemotingCommand::getFlag() const { return m_flag; }
+
+const int RemotingCommand::getVersion() const { return m_version; }
+
+void RemotingCommand::setMsgBody(const string& body) { m_msgBody = body; }
+
+string RemotingCommand::getMsgBody() const { return m_msgBody; }
+
+void RemotingCommand::addExtField(const string& key, const string& value) {
+ m_extFields[key] = value;
+}
+
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.h b/rocketmq-cpp/src/protocol/RemotingCommand.h
new file mode 100755
index 0000000..633a511
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingCommand.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTINGCOMMAND_H__
+#define __REMOTINGCOMMAND_H__
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <memory>
+#include <sstream>
+#include "CommandHeader.h"
+#include "dataBlock.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+const int RPC_TYPE = 0; // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND;
+const int RPC_ONEWAY = 1; // 0, RPC // 1, Oneway;
+//<!***************************************************************************
+class RemotingCommand {
+ public:
+ RemotingCommand(int code, CommandHeader* pCustomHeader = NULL);
+ RemotingCommand(int code, string language, int version, int opaque, int flag,
+ string remark, CommandHeader* pCustomHeader);
+ virtual ~RemotingCommand();
+
+ const MemoryBlock* GetHead() const;
+ const MemoryBlock* GetBody() const;
+
+ void SetBody(const char* pData, int len);
+ void setOpaque(const int opa);
+ void SetExtHeader(int code);
+
+ void setCode(int code);
+ int getCode() const;
+ int getOpaque() const;
+ void setRemark(string mark);
+ string getRemark() const;
+ void markResponseType();
+ bool isResponseType();
+ void markOnewayRPC();
+ bool isOnewayRPC();
+ void setParsedJson(Json::Value json);
+
+ CommandHeader* getCommandHeader() const;
+ const int getFlag() const;
+ const int getVersion() const;
+
+ void addExtField(const string& key, const string& value);
+ string getMsgBody() const;
+ void setMsgBody(const string& body);
+
+ public:
+ void Encode();
+ static RemotingCommand* Decode(const MemoryBlock& mem);
+
+ private:
+ int m_code;
+ string m_language;
+ int m_version;
+ int m_opaque;
+ int m_flag;
+ string m_remark;
+ string m_msgBody;
+ map<string, string> m_extFields;
+
+ static boost::mutex m_clock;
+ MemoryBlock m_head;
+ MemoryBlock m_body;
+ //<!save here
+ Json::Value m_parsedJson;
+ static boost::atomic<int> s_seqNumber;
+ unique_ptr<CommandHeader> m_pExtHeader;
+};
+
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingSerializable.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingSerializable.h b/rocketmq-cpp/src/protocol/RemotingSerializable.h
new file mode 100755
index 0000000..812a892
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingSerializable.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTINGSERIALIZABLE_H__
+#define __REMOTINGSERIALIZABLE_H__
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class RemotingSerializable {
+ public:
+ virtual ~RemotingSerializable(){};
+ virtual void Encode(std::string& outData) = 0;
+};
+
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicList.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/TopicList.h b/rocketmq-cpp/src/protocol/TopicList.h
new file mode 100755
index 0000000..d8d14a7
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/TopicList.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICLIST_H__
+#define __TOPICLIST_H__
+#include <string>
+#include <vector>
+#include "dataBlock.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class TopicList {
+ public:
+ static TopicList* Decode(const MemoryBlock* mem) { return new TopicList(); }
+
+ private:
+ vector<string> m_topicList;
+};
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicRouteData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/TopicRouteData.h b/rocketmq-cpp/src/protocol/TopicRouteData.h
new file mode 100755
index 0000000..ec8f842
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/TopicRouteData.h
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICROUTEDATA_H__
+#define __TOPICROUTEDATA_H__
+#include <algorithm>
+#include "Logging.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+struct QueueData {
+ string brokerName;
+ int readQueueNums;
+ int writeQueueNums;
+ int perm;
+
+ bool operator<(const QueueData& other) const {
+ return brokerName < other.brokerName;
+ }
+
+ bool operator==(const QueueData& other) const {
+ if (brokerName == other.brokerName &&
+ readQueueNums == other.readQueueNums &&
+ writeQueueNums == other.writeQueueNums && perm == other.perm) {
+ return true;
+ }
+ return false;
+ }
+};
+
+//<!***************************************************************************
+struct BrokerData {
+ string brokerName;
+ map<int, string> brokerAddrs; //<!0:master,1,2.. slave
+
+ bool operator<(const BrokerData& other) const {
+ return brokerName < other.brokerName;
+ }
+
+ bool operator==(const BrokerData& other) const {
+ if (brokerName == other.brokerName && brokerAddrs == other.brokerAddrs) {
+ return true;
+ }
+ return false;
+ }
+};
+
+//<!************************************************************************/
+class TopicRouteData {
+ public:
+ virtual ~TopicRouteData() {
+ m_brokerDatas.clear();
+ m_queueDatas.clear();
+ }
+
+ static TopicRouteData* Decode(const MemoryBlock* mem) {
+ //<!see doc/TopicRouteData.json;
+ const char* const pData = static_cast<const char*>(mem->getData());
+ string data(pData, mem->getSize());
+
+ Json::Value root;
+ Json::CharReaderBuilder charReaderBuilder;
+ charReaderBuilder.settings_["allowNumericKeys"] = true;
+ unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
+ const char* begin = pData;
+ const char* end = pData + mem->getSize();
+ string errs;
+ if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
+ LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(), root.isObject());
+ return NULL;
+ }
+
+ TopicRouteData* trd = new TopicRouteData();
+ trd->setOrderTopicConf(root["orderTopicConf"].asString());
+
+ Json::Value qds = root["queueDatas"];
+ for (unsigned int i = 0; i < qds.size(); i++) {
+ QueueData d;
+ Json::Value qd = qds[i];
+ d.brokerName = qd["brokerName"].asString();
+ d.readQueueNums = qd["readQueueNums"].asInt();
+ d.writeQueueNums = qd["writeQueueNums"].asInt();
+ d.perm = qd["perm"].asInt();
+
+ trd->getQueueDatas().push_back(d);
+ }
+
+ sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
+
+ Json::Value bds = root["brokerDatas"];
+ for (unsigned int i = 0; i < bds.size(); i++) {
+ BrokerData d;
+ Json::Value bd = bds[i];
+ d.brokerName = bd["brokerName"].asString();
+
+ LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
+
+ Json::Value bas = bd["brokerAddrs"];
+ Json::Value::Members mbs = bas.getMemberNames();
+ for (size_t i = 0; i < mbs.size(); i++) {
+ string key = mbs.at(i);
+ LOG_DEBUG("brokerid:%s,brokerAddr:%s", key.c_str(),
+ bas[key].asString().c_str());
+ d.brokerAddrs[atoi(key.c_str())] = bas[key].asString();
+ }
+
+ trd->getBrokerDatas().push_back(d);
+ }
+
+ sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+
+ return trd;
+ }
+
+ string selectBrokerAddr() {
+ vector<BrokerData>::iterator it = m_brokerDatas.begin();
+ for (; it != m_brokerDatas.end(); ++it) {
+ map<int, string>::iterator it1 = (*it).brokerAddrs.find(MASTER_ID);
+ if (it1 != (*it).brokerAddrs.end()) {
+ return it1->second;
+ }
+ }
+ return "";
+ }
+
+
+ vector<QueueData>& getQueueDatas() { return m_queueDatas; }
+
+ vector<BrokerData>& getBrokerDatas() { return m_brokerDatas; }
+
+ const string& getOrderTopicConf() const { return m_orderTopicConf; }
+
+ void setOrderTopicConf(const string& orderTopicConf) {
+ m_orderTopicConf = orderTopicConf;
+ }
+
+ bool operator==(const TopicRouteData& other) const {
+ if (m_brokerDatas != other.m_brokerDatas) {
+ return false;
+ }
+
+ if (m_orderTopicConf != other.m_orderTopicConf) {
+ return false;
+ }
+
+ if (m_queueDatas != other.m_queueDatas) {
+ return false;
+ }
+ return true;
+ }
+
+ public:
+ private:
+ string m_orderTopicConf;
+ vector<QueueData> m_queueDatas;
+ vector<BrokerData> m_brokerDatas;
+};
+
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
new file mode 100755
index 0000000..ba1a035
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
@@ -0,0 +1,70 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_BATCH_DESCRIPTOR_H_ // NOLINT
+#define DISRUPTOR_BATCH_DESCRIPTOR_H_ // NOLINT
+
+#include "sequence.h"
+
+namespace rocketmq {
+
+// Used to record the batch of sequences claimed via {@link Sequencer}.
+class BatchDescriptor {
+ public:
+ // Create a holder for tracking a batch of claimed sequences in a
+ // {@link Sequencer}
+ //
+ // @param size of the batch to claim.
+ BatchDescriptor(int size) :
+ size_(size),
+ end_(kInitialCursorValue) {}
+
+ // Get the size of the batch
+ int size() const { return size_; }
+
+ // Get the end sequence of a batch.
+ //
+ // @return the end sequence in the batch.
+ int64_t end() const { return end_; }
+
+ // Set the end sequence of a batch.
+ //
+ // @param end sequence in the batch.
+ void set_end(int64_t end) { end_ = end; }
+
+
+ // Get the starting sequence of the batch.
+ //
+ // @return starting sequence in the batch.
+ int64_t Start() const { return end_ - size_ + 1L; }
+
+ private:
+ int size_;
+ int64_t end_;
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_SEQUENCE_BATCH_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/claim_strategy.h b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
new file mode 100755
index 0000000..0f3263a
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
@@ -0,0 +1,231 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT
+#define DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT
+
+#include <boost/thread.hpp>
+#include <boost/noncopyable.hpp>
+#include <vector>
+
+#include "interface.h"
+
+namespace rocketmq {
+
+enum ClaimStrategyOption {
+ kSingleThreadedStrategy,
+ kMultiThreadedStrategy
+};
+
+// Optimised strategy can be used when there is a single publisher thread
+// claiming {@link AbstractEvent}s.
+class SingleThreadedStrategy :public noncopyable, public ClaimStrategyInterface {
+ public:
+ SingleThreadedStrategy(const int& buffer_size) :
+ buffer_size_(buffer_size),
+ sequence_(kInitialCursorValue),
+ min_gating_sequence_(kInitialCursorValue) {}
+
+ virtual int64_t IncrementAndGet(
+ const std::vector<Sequence*>& dependent_sequences) {
+ int64_t next_sequence = sequence_.IncrementAndGet(1L);
+ WaitForFreeSlotAt(next_sequence, dependent_sequences);
+ return next_sequence;
+ }
+
+ virtual int64_t IncrementAndGet(const int& delta,
+ const std::vector<Sequence*>& dependent_sequences) {
+ int64_t next_sequence = sequence_.IncrementAndGet(delta);
+ WaitForFreeSlotAt(next_sequence, dependent_sequences);
+ return next_sequence;
+ }
+
+ virtual bool HasAvalaibleCapacity(
+ const std::vector<Sequence*>& dependent_sequences) {
+ int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+ if (wrap_point > min_gating_sequence_.sequence()) {
+ int64_t min_sequence = GetMinimumSequence(dependent_sequences);
+ min_gating_sequence_.set_sequence(min_sequence);
+ if (wrap_point > min_sequence)
+ return false;
+ }
+ return true;
+ }
+
+ virtual void SetSequence(const int64_t& sequence,
+ const std::vector<Sequence*>& dependent_sequences) {
+ sequence_.set_sequence(sequence);
+ WaitForFreeSlotAt(sequence, dependent_sequences);
+ }
+
+ virtual void SerialisePublishing(const int64_t& sequence,
+ const Sequence& cursor,
+ const int64_t& batch_size) {}
+
+ private:
+ SingleThreadedStrategy();
+
+ void WaitForFreeSlotAt(const int64_t& sequence,
+ const std::vector<Sequence*>& dependent_sequences) {
+ int64_t wrap_point = sequence - buffer_size_;
+ if (wrap_point > min_gating_sequence_.sequence()) {
+ int64_t min_sequence;
+ while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+ boost::this_thread::yield();
+ }
+ }
+ }
+
+ const int buffer_size_;
+ PaddedLong sequence_;
+ PaddedLong min_gating_sequence_;
+
+};
+
+// Strategy to be used when there are multiple publisher threads claiming
+// {@link AbstractEvent}s.
+/*
+class MultiThreadedStrategy : public ClaimStrategyInterface {
+ public:
+ MultiThreadedStrategy(const int& buffer_size) :
+ buffer_size_(buffer_size),
+ sequence_(kInitialCursorValue),
+ min_processor_sequence_(kInitialCursorValue) {}
+
+ virtual int64_t IncrementAndGet(
+ const std::vector<Sequence*>& dependent_sequences) {
+ WaitForCapacity(dependent_sequences, min_gating_sequence_local_);
+ int64_t next_sequence = sequence_.IncrementAndGet();
+ WaitForFreeSlotAt(next_sequence,
+ dependent_sequences,
+ min_gating_sequence_local_);
+ return next_sequence;
+ }
+
+ virtual int64_t IncrementAndGet(const int& delta,
+ const std::vector<Sequence*>& dependent_sequences) {
+ int64_t next_sequence = sequence_.IncrementAndGet(delta);
+ WaitForFreeSlotAt(next_sequence,
+ dependent_sequences,
+ min_gating_sequence_local_);
+ return next_sequence;
+ }
+ virtual void SetSequence(const int64_t& sequence,
+ const std::vector<Sequence*>& dependent_sequences) {
+ sequence_.set_sequence(sequence);
+ WaitForFreeSlotAt(sequence,
+ dependent_sequences,
+ min_gating_sequence_local_);
+ }
+
+ virtual bool HasAvalaibleCapacity(
+ const std::vector<Sequence*>& dependent_sequences) {
+ const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+ if (wrap_point > min_gating_sequence_local_.sequence()) {
+ int64_t min_sequence = GetMinimumSequence(dependent_sequences);
+ min_gating_sequence_local_.set_sequence(min_sequence);
+ if (wrap_point > min_sequence)
+ return false;
+ }
+ return true;
+ }
+
+ virtual void SerialisePublishing(const Sequence& cursor,
+ const int64_t& sequence,
+ const int64_t& batch_size) {
+ int64_t expected_sequence = sequence - batch_size;
+ int counter = retries;
+
+ while (expected_sequence != cursor.sequence()) {
+ if (0 == --counter) {
+ counter = retries;
+ std::this_thread::yield();
+ }
+ }
+ }
+
+ private:
+ // Methods
+ void WaitForCapacity(const std::vector<Sequence*>& dependent_sequences,
+ const MutableLong& min_gating_sequence) {
+ const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+ if (wrap_point > min_gating_sequence.sequence()) {
+ int counter = retries;
+ int64_t min_sequence;
+ while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+ counter = ApplyBackPressure(counter);
+ }
+ min_gating_sequence.set_sequence(min_sequence);
+ }
+ }
+
+ void WaitForFreeSlotAt(const int64_t& sequence,
+ const std::vector<Sequence*>& dependent_sequences,
+ const MutableLong& min_gating_sequence) {
+ const int64_t wrap_point = sequence - buffer_size_;
+ if (wrap_point > min_gating_sequence.sequence()) {
+ int64_t min_sequence;
+ while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+ std::this_thread::yield();
+ }
+ min_gating_sequence.set_sequence(min_sequence);
+ }
+ }
+
+ int ApplyBackPressure(int counter) {
+ if (0 != counter) {
+ --counter;
+ std::this_thread::yield();
+ } else {
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ return counter;
+ }
+
+ const int buffer_size_;
+ PaddedSequence sequence_;
+ thread_local PaddedLong min_gating_sequence_local_;
+
+ const int retries = 100;
+
+};
+*/
+
+ClaimStrategyInterface* CreateClaimStrategy(ClaimStrategyOption option,
+ const int& buffer_size) {
+ switch (option) {
+ case kSingleThreadedStrategy:
+ return new SingleThreadedStrategy(buffer_size);
+ // case kMultiThreadedStrategy:
+ // return new MultiThreadedStrategy(buffer_size);
+ default:
+ return NULL;
+ }
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_CLAIM_STRATEGY_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_processor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/event_processor.h b/rocketmq-cpp/src/thread/disruptor/event_processor.h
new file mode 100755
index 0000000..fb62812
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/event_processor.h
@@ -0,0 +1,130 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT
+#define DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT
+
+#include <stdexcept>
+#include "ring_buffer.h"
+
+namespace rocketmq {
+
+template <typename T>
+class NoOpEventProcessor : public EventProcessorInterface<T> {
+ public:
+ NoOpEventProcessor(RingBuffer<T>* ring_buffer) :
+ ring_buffer_(ring_buffer) { }
+
+ virtual Sequence* GetSequence() {
+ return ring_buffer_->GetSequencePtr();
+ }
+
+ virtual void Halt() {}
+
+ virtual void Run() {}
+
+ private:
+ RingBuffer<T>* ring_buffer_;
+};
+
+template <typename T>
+class BatchEventProcessor : public boost::noncopyable, public EventProcessorInterface<T> {
+ public:
+ BatchEventProcessor(RingBuffer<T>* ring_buffer,
+ SequenceBarrierInterface* sequence_barrier,
+ EventHandlerInterface<T>* event_handler,
+ ExceptionHandlerInterface<T>* exception_handler) :
+ running_(false),
+ ring_buffer_(ring_buffer),
+ sequence_barrier_(sequence_barrier),
+ event_handler_(event_handler),
+ exception_handler_(exception_handler) {}
+
+
+ virtual Sequence* GetSequence() { return &sequence_; }
+
+ virtual void Halt() {
+ running_.store(false);
+ sequence_barrier_->Alert();
+ }
+
+ virtual void Run() {
+ if (running_.load())
+ {
+ printf("Thread is already running\r\n");
+ }
+ running_.store(true);
+ sequence_barrier_->ClearAlert();
+ event_handler_->OnStart();
+
+ T* event = NULL;
+ int64_t next_sequence = sequence_.sequence() + 1L;
+
+ while (true) {
+ try {
+ int64_t avalaible_sequence = \
+ sequence_barrier_->WaitFor(next_sequence, 300*1000);//wait 300 milliseconds to avoid taskThread blocking on BlockingStrategy::WaitFor when shutdown
+ //metaq::LOG_INFO("avalaible_sequence:%d, next_sequence:%d", avalaible_sequence,next_sequence);
+ while (next_sequence <= avalaible_sequence) {
+ event = ring_buffer_->Get(next_sequence);
+ event_handler_->OnEvent(next_sequence,
+ next_sequence == avalaible_sequence, event);
+ next_sequence++;
+ }
+
+ sequence_.set_sequence(next_sequence - 1L);
+ } catch(const AlertException& e) {
+ //metaq::LOG_INFO("catch alertException");
+ if (!running_.load())
+ break;
+ } catch(const std::exception& e) {
+ //metaq::LOG_ERROR("catch stdException");
+ exception_handler_->Handle(e, next_sequence, event);
+ sequence_.set_sequence(next_sequence);
+ next_sequence++;
+ }
+ }
+ //metaq::LOG_INFO("BatchEventProcessor shutdown");
+ event_handler_->OnShutdown();
+ running_.store(false);
+ }
+
+ void operator()() { Run(); }
+
+ private:
+ boost::atomic<bool> running_;
+ Sequence sequence_;
+
+ RingBuffer<T>* ring_buffer_;
+ SequenceBarrierInterface* sequence_barrier_;
+ EventHandlerInterface<T>* event_handler_;
+ ExceptionHandlerInterface<T>* exception_handler_;
+
+};
+
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_EVENT_PROCESSOR_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_publisher.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/event_publisher.h b/rocketmq-cpp/src/thread/disruptor/event_publisher.h
new file mode 100755
index 0000000..ae0efd9
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/event_publisher.h
@@ -0,0 +1,50 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT
+#define DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT
+
+#include "ring_buffer.h"
+
+namespace rocketmq {
+
+template<typename T>
+class EventPublisher {
+ public:
+ EventPublisher(RingBuffer<T>* ring_buffer) : ring_buffer_(ring_buffer) {}
+
+ void PublishEvent(EventTranslatorInterface<T>* translator) {
+ int64_t sequence = ring_buffer_->Next();
+ translator->TranslateTo(sequence, ring_buffer_->Get(sequence));
+ ring_buffer_->Publish(sequence);
+ }
+
+ private:
+ RingBuffer<T>* ring_buffer_;
+};
+
+}; // namespace rocketmq
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exception_handler.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/exception_handler.h b/rocketmq-cpp/src/thread/disruptor/exception_handler.h
new file mode 100755
index 0000000..e7979a0
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/exception_handler.h
@@ -0,0 +1,59 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EXCEPTION_HANDLER_H_ // NOLINT
+#define DISRUPTOR_EXCEPTION_HANDLER_H_ // NOLINT
+
+#include <exception>
+
+#include "interface.h"
+
+namespace rocketmq {
+
+template<typename T>
+class IgnoreExceptionHandler: public ExceptionHandlerInterface<T> {
+ public:
+ virtual void Handle(const std::exception& exception,
+ const int64_t& sequence,
+ T* event) {
+ // do nothing with the exception.
+ ;
+ }
+};
+
+template<typename T>
+class FatalExceptionHandler: public ExceptionHandlerInterface<T> {
+ public:
+ virtual void Handle(const std::exception& exception,
+ const int64_t& sequence,
+ T* event) {
+ // rethrow the exception
+ throw exception;
+ }
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_EXCEPTION_HANDLER_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exceptions.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/exceptions.h b/rocketmq-cpp/src/thread/disruptor/exceptions.h
new file mode 100755
index 0000000..f968043
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/exceptions.h
@@ -0,0 +1,38 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EXCEPTIONS_H_ // NOLINT
+#define DISRUPTOR_EXCEPTIONS_H_ // NOLINT
+
+#include <exception>
+
+namespace rocketmq {
+
+class AlertException : public std::exception {
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_EXCEPTIONS_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/interface.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/interface.h b/rocketmq-cpp/src/thread/disruptor/interface.h
new file mode 100755
index 0000000..0c07774
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/interface.h
@@ -0,0 +1,278 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_INTERFACE_H_ // NOLINT
+#define DISRUPTOR_INTERFACE_H_ // NOLINT
+
+#include <climits>
+#include <vector>
+
+#include "sequence.h"
+#include "batch_descriptor.h"
+
+namespace rocketmq {
+
+// Strategies employed for claiming the sequence of events in the
+// {@link Seqencer} by publishers.
+class ClaimStrategyInterface {
+ public:
+ // Is there available capacity in the buffer for the requested sequence.
+ //
+ // @param dependent_sequences to be checked for range.
+ // @return true if the buffer has capacity for the requested sequence.
+ virtual ~ClaimStrategyInterface() {}
+ virtual bool HasAvalaibleCapacity(
+ const std::vector<Sequence*>& dependent_sequences) = 0;
+
+ // Claim the next sequence in the {@link Sequencer}.
+ //
+ // @param dependent_sequences to be checked for range.
+ // @return the index to be used for the publishing.
+ virtual int64_t IncrementAndGet(
+ const std::vector<Sequence*>& dependent_sequences) = 0;
+
+ // Claim the next sequence in the {@link Sequencer}.
+ //
+ // @param delta to increment by.
+ // @param dependent_sequences to be checked for range.
+ // @return the index to be used for the publishing.
+ virtual int64_t IncrementAndGet(const int& delta,
+ const std::vector<Sequence*>& dependent_sequences) = 0;
+
+ // Set the current sequence value for claiming an event in the
+ // {@link Sequencer}.
+ //
+ // @param sequence to be set as the current value.
+ // @param dependent_sequences to be checked for range.
+ virtual void SetSequence(const int64_t& sequence,
+ const std::vector<Sequence*>& dependent_sequences) = 0;
+
+ // Serialise publishing in sequence.
+ //
+ // @param sequence to be applied.
+ // @param cursor to be serialise against.
+ // @param batch_size of the sequence.
+ virtual void SerialisePublishing(const int64_t& sequence,
+ const Sequence& cursor,
+ const int64_t& batch_size) = 0;
+};
+
+// Coordination barrier for tracking the cursor for publishers and sequence of
+// dependent {@link EventProcessor}s for processing a data structure.
+class SequenceBarrierInterface {
+ public:
+ // Wait for the given sequence to be available for consumption.
+ //
+ // @param sequence to wait for.
+ // @return the sequence up to which is available.
+ //
+ // @throws AlertException if a status change has occurred for the
+ // Disruptor.
+ virtual ~SequenceBarrierInterface(){}
+ virtual int64_t WaitFor(const int64_t& sequence) = 0;
+
+ // Wait for the given sequence to be available for consumption with a
+ // time out.
+ //
+ // @param sequence to wait for.
+ // @param timeout in microseconds.
+ // @return the sequence up to which is available.
+ //
+ // @throws AlertException if a status change has occurred for the
+ // Disruptor.
+ virtual int64_t WaitFor(const int64_t& sequence,
+ const int64_t& timeout_micro) = 0;
+
+ // Delegate a call to the {@link Sequencer#getCursor()}
+ //
+ // @return value of the cursor for entries that have been published.
+ virtual int64_t GetCursor() const = 0;
+
+ // The current alert status for the barrier.
+ //
+ // @return true if in alert otherwise false.
+ virtual bool IsAlerted() const = 0;
+
+ // Alert the {@link EventProcessor}s of a status change and stay in this
+ // status until cleared.
+ virtual void Alert() = 0;
+
+ // Clear the current alert status.
+ virtual void ClearAlert() = 0;
+
+ // Check if barrier is alerted, if so throws an AlertException
+ //
+ // @throws AlertException if barrier is alerted
+ virtual void CheckAlert() const = 0;
+};
+
+// Called by the {@link RingBuffer} to pre-populate all the events to fill the
+// RingBuffer.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventFactoryInterface {
+ public:
+ virtual ~EventFactoryInterface(){}
+ virtual T* NewInstance(const int& size) const = 0;
+};
+
+// Callback interface to be implemented for processing events as they become
+// available in the {@link RingBuffer}.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventHandlerInterface {
+ public:
+ // Called when a publisher has published an event to the {@link RingBuffer}
+ //
+ // @param event published to the {@link RingBuffer}
+ // @param sequence of the event being processed
+ // @param end_of_batch flag to indicate if this is the last event in a batch
+ // from the {@link RingBuffer}
+ //
+ // @throws Exception if the EventHandler would like the exception handled
+ // further up the chain.
+ virtual ~EventHandlerInterface(){}
+ virtual void OnEvent(const int64_t& sequence,
+ const bool& end_of_batch,
+ T* event) = 0;
+
+ // Called once on thread start before processing the first event.
+ virtual void OnStart() = 0;
+
+ // Called once on thread stop just before shutdown.
+ virtual void OnShutdown() = 0;
+};
+
+// Implementations translate another data representations into events claimed
+// for the {@link RingBuffer}.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventTranslatorInterface {
+ public:
+ // Translate a data representation into fields set in given event
+ //
+ // @param event into which the data should be translated.
+ // @param sequence that is assigned to events.
+ // @return the resulting event after it has been translated.
+ virtual ~EventTranslatorInterface(){}
+ virtual T* TranslateTo(const int64_t& sequence, T* event) { return NULL;}
+};
+
+// EventProcessors wait for events to become available for consumption from
+// the {@link RingBuffer}. An event processor should be associated with a
+// thread.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventProcessorInterface {
+ public:
+ // Get a pointer to the {@link Sequence} being used by this
+ // {@link EventProcessor}.
+ //
+ // @return pointer to the {@link Sequence} for this
+ // {@link EventProcessor}
+ virtual ~EventProcessorInterface(){}
+ virtual Sequence* GetSequence() = 0;
+
+ // Signal that this EventProcessor should stop when it has finished
+ // consuming at the next clean break.
+ // It will call {@link DependencyBarrier#alert()} to notify the thread to
+ // check status.
+ virtual void Halt() = 0;
+};
+
+// Callback handler for uncaught exception in the event processing cycle
+// of the {@link BatchEventProcessor}.
+//
+// @param <T> event type stored in the {@link RingBuffer}.
+template<typename T>
+class ExceptionHandlerInterface {
+ public:
+ // Strategy for handling uncaught exceptions when processing an event.
+ // If the strategy wishes to suspend further processing by the
+ // {@link BatchEventProcessor} then it should throw a std::runtime_error.
+ //
+ // @param exception that propagated from the {@link EventHandler}.
+ // @param sequence of the event which caused the exception.
+ // @param event being processed when the exception occured.
+ virtual ~ExceptionHandlerInterface(){}
+ virtual void Handle(const std::exception& exception,
+ const int64_t& sequence,
+ T* event) = 0;
+};
+
+// Strategy employed for making {@link EventProcessor}s wait on a cursor
+// {@link Sequence}.
+class WaitStrategyInterface: public boost::noncopyable {
+ public:
+ // Wait for the given sequence to be available for consumption.
+ //
+ // @param dependents further back the chain that must advance first.
+ // @param cursor on which to wait.
+ // @param barrier the consumer is waiting on.
+ // @param sequence to be waited on.
+ // @return the sequence that is available which may be greater than the
+ // requested sequence.
+ //
+ // @throws AlertException if the status of the Disruptor has changed.
+ virtual ~WaitStrategyInterface(){}
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence) = 0;
+
+ // Wait for the given sequence to be available for consumption in a
+ // {@link RingBuffer} with a timeout specified.
+ //
+ // @param dependents further back the chain that must advance first
+ // @param cursor on which to wait.
+ // @param barrier the consumer is waiting on.
+ // @param sequence to be waited on.
+ // @param timeout value in micro seconds to abort after.
+ // @return the sequence that is available which may be greater than the
+ // requested sequence.
+ //
+ // @throws AlertException if the status of the Disruptor has changed.
+ // @throws InterruptedException if the thread is interrupted.
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t & sequence,
+ const int64_t & timeout_micros) = 0;
+
+ // Signal those waiting that the cursor has advanced.
+ virtual void SignalAllWhenBlocking() = 0;
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_INTERFACE_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/ring_buffer.h b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
new file mode 100755
index 0000000..c7150f1
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
@@ -0,0 +1,90 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_RING_BUFFER_H_ // NOLINT
+#define DISRUPTOR_RING_BUFFER_H_ // NOLINT
+
+#include <boost/array.hpp>
+#include <vector>
+
+#include "interface.h"
+#include "claim_strategy.h"
+#include "wait_strategy.h"
+#include "sequencer.h"
+#include "sequence_barrier.h"
+
+namespace rocketmq {
+
+// Ring based store of reusable entries containing the data representing an
+// event beign exchanged between publisher and {@link EventProcessor}s.
+//
+// @param <T> implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class RingBuffer : public Sequencer {
+ public:
+ // Construct a RingBuffer with the full option set.
+ //
+ // @param event_factory to instance new entries for filling the RingBuffer.
+ // @param buffer_size of the RingBuffer, must be a power of 2.
+ // @param claim_strategy_option threading strategy for publishers claiming
+ // entries in the ring.
+ // @param wait_strategy_option waiting strategy employed by
+ // processors_to_track waiting in entries becoming available.
+ RingBuffer(EventFactoryInterface<T>* event_factory,
+ int buffer_size,
+ ClaimStrategyOption claim_strategy_option,
+ WaitStrategyOption wait_strategy_option) :
+ Sequencer(buffer_size,
+ claim_strategy_option,
+ wait_strategy_option),
+ buffer_size_(buffer_size),
+ mask_(buffer_size - 1),
+ events_(event_factory->NewInstance(buffer_size)) {
+ }
+
+ ~RingBuffer() {
+ delete[] events_;
+ }
+
+ // Get the event for a given sequence in the RingBuffer.
+ //
+ // @param sequence for the event
+ // @return event pointer at the specified sequence position.
+ T* Get(const int64_t& sequence) {
+ return &events_[sequence & mask_];
+ }
+
+ private:
+ // Members
+ int buffer_size_;
+ int mask_;
+ T* events_;
+
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequence.h b/rocketmq-cpp/src/thread/disruptor/sequence.h
new file mode 100755
index 0000000..f1396f3
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequence.h
@@ -0,0 +1,139 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef CACHE_LINE_SIZE_IN_BYTES // NOLINT
+#define CACHE_LINE_SIZE_IN_BYTES 64 // NOLINT
+#endif // NOLINT
+#define ATOMIC_SEQUENCE_PADDING_LENGTH \
+ (CACHE_LINE_SIZE_IN_BYTES - sizeof(boost::atomic<int64_t>))/8
+#define SEQUENCE_PADDING_LENGTH \
+ (CACHE_LINE_SIZE_IN_BYTES - sizeof(int64_t))/8
+
+#ifndef DISRUPTOR_SEQUENCE_H_ // NOLINT
+#define DISRUPTOR_SEQUENCE_H_ // NOLINT
+
+#include <boost/atomic.hpp>
+#include <boost/memory_order.hpp>
+#include <boost/noncopyable.hpp>
+#include <vector>
+#include <limits>
+using namespace boost;
+namespace rocketmq {
+
+const int64_t kInitialCursorValue = -1L;
+
+// Sequence counter.
+class Sequence:public noncopyable {
+ public:
+ // Construct a sequence counter that can be tracked across threads.
+ //
+ // @param initial_value for the counter.
+ Sequence(int64_t initial_value = kInitialCursorValue) :
+ value_(initial_value) {}
+
+ // Get the current value of the {@link Sequence}.
+ //
+ // @return the current value.
+ int64_t sequence() const { return value_.load(boost::memory_order_acquire); }
+
+ // Set the current value of the {@link Sequence}.
+ //
+ // @param the value to which the {@link Sequence} will be set.
+ void set_sequence(int64_t value) { value_.store(value, boost::memory_order_release); }
+
+ // Increment and return the value of the {@link Sequence}.
+ //
+ // @param increment the {@link Sequence}.
+ // @return the new value incremented.
+ int64_t IncrementAndGet(const int64_t& increment) {
+ return value_.fetch_add(increment, boost::memory_order_release) + increment;
+ }
+
+ private:
+ // members
+ boost::atomic<int64_t> value_;
+
+};
+
+// Cache line padded sequence counter.
+//
+// Can be used across threads without worrying about false sharing if a
+// located adjacent to another counter in memory.
+class PaddedSequence : public Sequence {
+ public:
+ PaddedSequence(int64_t initial_value = kInitialCursorValue) :
+ Sequence(initial_value) {}
+
+ private:
+ // padding
+ int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+
+};
+
+// Non-atomic sequence counter.
+//
+// This counter is not thread safe.
+class MutableLong {
+ public:
+ MutableLong(int64_t initial_value = kInitialCursorValue) :
+ sequence_(initial_value) {}
+
+ int64_t sequence() const { return sequence_; }
+
+ void set_sequence(const int64_t& sequence) { sequence_ = sequence; };
+
+ int64_t IncrementAndGet(const int64_t& delta) { sequence_ += delta; return sequence_; }
+
+ private:
+ volatile int64_t sequence_;
+};
+
+// Cache line padded non-atomic sequence counter.
+//
+// This counter is not thread safe.
+class PaddedLong : public MutableLong {
+ public:
+ PaddedLong(int64_t initial_value = kInitialCursorValue) :
+ MutableLong(initial_value) {}
+ private:
+ int64_t padding_[SEQUENCE_PADDING_LENGTH];
+};
+
+int64_t GetMinimumSequence(
+ const std::vector<Sequence*>& sequences) {
+ int64_t minimum = std::numeric_limits<int64_t>::max();
+
+ std::vector<Sequence*>::const_iterator it= sequences.begin();
+ for (;it!=sequences.end();it++) {
+ int64_t sequence = (*it)->sequence();
+ minimum = minimum < sequence ? minimum : sequence;
+ }
+
+ return minimum;
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_SEQUENCE_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
new file mode 100755
index 0000000..c156388
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
@@ -0,0 +1,92 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT
+#define DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT
+
+#include <memory>
+#include <vector>
+
+#include "exceptions.h"
+#include "interface.h"
+namespace rocketmq {
+
+class ProcessingSequenceBarrier : SequenceBarrierInterface {
+ public:
+ ProcessingSequenceBarrier(WaitStrategyInterface* wait_strategy,
+ Sequence* sequence,
+ const std::vector<Sequence*>& sequences) :
+ wait_strategy_(wait_strategy),
+ cursor_(sequence),
+ dependent_sequences_(sequences),
+ alerted_(false) {
+ }
+
+ virtual int64_t WaitFor(const int64_t& sequence) {
+ return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this,
+ sequence);
+ }
+
+ virtual int64_t WaitFor(const int64_t& sequence,
+ const int64_t& timeout_micros) {
+ return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this,
+ sequence, timeout_micros);
+ }
+
+ virtual int64_t GetCursor() const {
+ return cursor_->sequence();
+ }
+
+ virtual bool IsAlerted() const {
+ return alerted_.load(boost::memory_order_acquire);
+ }
+
+ virtual void Alert() {
+ //metaq::LOG_INFO("set alert to true");
+ alerted_.store(true, boost::memory_order_release);
+ }
+
+ virtual void ClearAlert() {
+ alerted_.store(false, boost::memory_order_release);
+ }
+
+ virtual void CheckAlert() const {
+ if (IsAlerted())
+ {
+ //metaq::LOG_INFO("throw alert exception\r\n");
+ throw AlertException();
+ }
+ }
+
+ private:
+ WaitStrategyInterface* wait_strategy_;
+ Sequence* cursor_;
+ std::vector<Sequence*> dependent_sequences_;
+ boost::atomic<bool> alerted_;
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_DEPENDENCY_BARRIER_H_ NOLINT