You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:49 UTC

[03/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MQProtos.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MQProtos.h b/rocketmq-cpp/src/protocol/MQProtos.h
new file mode 100755
index 0000000..50c1841
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MQProtos.h
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQPROTOS_H__
+#define __MQPROTOS_H__
+
+namespace rocketmq {
+//<!***************************************************************************
+enum MQRequestCode {
+  // send msg to Broker
+  SEND_MESSAGE = 10,
+  // subscribe msg from Broker
+  PULL_MESSAGE = 11,
+  // query msg from Broker
+  QUERY_MESSAGE = 12,
+  // query Broker Offset
+  QUERY_BROKER_OFFSET = 13,
+  // query Consumer Offset from broker
+  QUERY_CONSUMER_OFFSET = 14,
+  // update Consumer Offset to broker
+  UPDATE_CONSUMER_OFFSET = 15,
+  // create or update Topic to broker
+  UPDATE_AND_CREATE_TOPIC = 17,
+  // get all topic config info from broker
+  GET_ALL_TOPIC_CONFIG = 21,
+  //git all topic list from broker
+  GET_TOPIC_CONFIG_LIST = 22,
+  //get topic name list from broker
+  GET_TOPIC_NAME_LIST = 23,
+  UPDATE_BROKER_CONFIG = 25,
+  GET_BROKER_CONFIG = 26,
+  TRIGGER_DELETE_FILES = 27,
+  GET_BROKER_RUNTIME_INFO = 28,
+  SEARCH_OFFSET_BY_TIMESTAMP = 29,
+  GET_MAX_OFFSET = 30,
+  GET_MIN_OFFSET = 31,
+  GET_EARLIEST_MSG_STORETIME = 32,
+  VIEW_MESSAGE_BY_ID = 33,
+  //send heartbeat to broker, and register itself
+  HEART_BEAT = 34,
+  //unregister client to broker
+  UNREGISTER_CLIENT = 35,
+  //send back consume fail msg to broker
+  CONSUMER_SEND_MSG_BACK = 36,
+  //Commit Or Rollback transaction
+  END_TRANSACTION = 37,
+  // get consumer list by group from broker
+  GET_CONSUMER_LIST_BY_GROUP = 38,
+
+  CHECK_TRANSACTION_STATE = 39,
+  //broker send notify to consumer when consumer lists changes
+  NOTIFY_CONSUMER_IDS_CHANGED = 40,
+  //lock mq before orderly consume
+  LOCK_BATCH_MQ = 41,
+  //unlock mq after orderly consume
+  UNLOCK_BATCH_MQ = 42,
+  GET_ALL_CONSUMER_OFFSET = 43,
+  GET_ALL_DELAY_OFFSET = 45,
+  PUT_KV_CONFIG = 100,
+  GET_KV_CONFIG = 101,
+  DELETE_KV_CONFIG = 102,
+  REGISTER_BROKER = 103,
+  UNREGISTER_BROKER = 104,
+  GET_ROUTEINTO_BY_TOPIC = 105,
+  GET_BROKER_CLUSTER_INFO = 106,
+  UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200,
+  GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201,
+  GET_TOPIC_STATS_INFO = 202,
+  GET_CONSUMER_CONNECTION_LIST = 203,
+  GET_PRODUCER_CONNECTION_LIST = 204,
+  WIPE_WRITE_PERM_OF_BROKER = 205,
+
+  GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206,
+  DELETE_SUBSCRIPTIONGROUP = 207,
+  GET_CONSUME_STATS = 208,
+  SUSPEND_CONSUMER = 209,
+  RESUME_CONSUMER = 210,
+  RESET_CONSUMER_OFFSET_IN_CONSUMER = 211,
+  RESET_CONSUMER_OFFSET_IN_BROKER = 212,
+  ADJUST_CONSUMER_THREAD_POOL = 213,
+  WHO_CONSUME_THE_MESSAGE = 214,
+
+  DELETE_TOPIC_IN_BROKER = 215,
+  DELETE_TOPIC_IN_NAMESRV = 216,
+
+  GET_KV_CONFIG_BY_VALUE = 217,
+
+  DELETE_KV_CONFIG_BY_VALUE = 218,
+
+  GET_KVLIST_BY_NAMESPACE = 219,
+
+
+  RESET_CONSUMER_CLIENT_OFFSET = 220,
+
+  GET_CONSUMER_STATUS_FROM_CLIENT = 221,
+
+  INVOKE_BROKER_TO_RESET_OFFSET = 222,
+
+  INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223,
+
+  QUERY_TOPIC_CONSUME_BY_WHO = 300,
+
+  GET_TOPICS_BY_CLUSTER = 224,
+
+  REGISTER_FILTER_SERVER = 301,
+
+  REGISTER_MESSAGE_FILTER_CLASS = 302,
+
+  QUERY_CONSUME_TIME_SPAN = 303,
+
+  GET_SYSTEM_TOPIC_LIST_FROM_NS = 304,
+  GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305,
+
+  CLEAN_EXPIRED_CONSUMEQUEUE = 306,
+
+  GET_CONSUMER_RUNNING_INFO = 307,
+
+  QUERY_CORRECTION_OFFSET = 308,
+
+  CONSUME_MESSAGE_DIRECTLY = 309,
+
+  SEND_MESSAGE_V2 = 310,
+
+  GET_UNIT_TOPIC_LIST = 311,
+  GET_HAS_UNIT_SUB_TOPIC_LIST = 312,
+  GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313,
+
+  CLONE_GROUP_OFFSET = 314,
+
+  VIEW_BROKER_STATS_DATA = 315
+};
+
+//<!***************************************************************************
+enum MQResponseCode {
+  //rcv success response from broker
+  SUCCESS_VALUE = 0,
+  //rcv exception from broker
+  SYSTEM_ERROR = 1,
+  //rcv symtem busy from broker
+  SYSTEM_BUSY = 2,
+  //broker don't support the request code
+  REQUEST_CODE_NOT_SUPPORTED = 3,
+  //broker flush disk timeout error
+  FLUSH_DISK_TIMEOUT = 10,
+  //broker sync double write, slave broker not available
+  SLAVE_NOT_AVAILABLE = 11,
+  //broker sync double write, slave broker flush msg timeout
+  FLUSH_SLAVE_TIMEOUT = 12,
+  //broker rcv illegal mesage
+  MESSAGE_ILLEGAL = 13,
+  //service not available due to broker or namesrv in shutdown status
+  SERVICE_NOT_AVAILABLE = 14,
+  //this version is not supported on broker or namesrv
+  VERSION_NOT_SUPPORTED = 15,
+  //broker or Namesrv has no permission to do this operation
+  NO_PERMISSION = 16,
+  //topic is not exist on broker
+  TOPIC_NOT_EXIST = 17,
+  //broker already created this topic
+  TOPIC_EXIST_ALREADY = 18,
+  //pulled msg was not found
+  PULL_NOT_FOUND = 19,
+  //retry later
+  PULL_RETRY_IMMEDIATELY = 20,
+  //pull msg with wrong offset
+  PULL_OFFSET_MOVED = 21,
+  //could not find the query msg
+  QUERY_NOT_FOUND = 22,
+
+  SUBSCRIPTION_PARSE_FAILED = 23,
+  SUBSCRIPTION_NOT_EXIST = 24,
+  SUBSCRIPTION_NOT_LATEST = 25,
+  SUBSCRIPTION_GROUP_NOT_EXIST = 26,
+
+  TRANSACTION_SHOULD_COMMIT = 200,
+  TRANSACTION_SHOULD_ROLLBACK = 201,
+  TRANSACTION_STATE_UNKNOW = 202,
+  TRANSACTION_STATE_GROUP_WRONG = 203,
+
+  CONSUMER_NOT_ONLINE = 206,
+  CONSUME_MSG_TIMEOUT = 207
+};
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MessageQueue.cpp b/rocketmq-cpp/src/protocol/MessageQueue.cpp
new file mode 100755
index 0000000..f1b3f8f
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MessageQueue.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessageQueue.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MessageQueue::MessageQueue() {
+  m_queueId = -1;  // invalide mq
+  m_topic.clear();
+  m_brokerName.clear();
+}
+
+MessageQueue::MessageQueue(const string& topic, const string& brokerName,
+                           int queueId)
+    : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId) {}
+
+MessageQueue::MessageQueue(const MessageQueue& other)
+    : m_topic(other.m_topic),
+      m_brokerName(other.m_brokerName),
+      m_queueId(other.m_queueId) {}
+
+MessageQueue& MessageQueue::operator=(const MessageQueue& other) {
+  if (this != &other) {
+    m_brokerName = other.m_brokerName;
+    m_topic = other.m_topic;
+    m_queueId = other.m_queueId;
+  }
+  return *this;
+}
+
+string MessageQueue::getTopic() const { return m_topic; }
+
+void MessageQueue::setTopic(const string& topic) { m_topic = topic; }
+
+string MessageQueue::getBrokerName() const { return m_brokerName; }
+
+void MessageQueue::setBrokerName(const string& brokerName) {
+  m_brokerName = brokerName;
+}
+
+int MessageQueue::getQueueId() const { return m_queueId; }
+
+void MessageQueue::setQueueId(int queueId) { m_queueId = queueId; }
+
+bool MessageQueue::operator==(const MessageQueue& mq) const {
+  if (this == &mq) {
+    return true;
+  }
+
+  if (m_brokerName != mq.m_brokerName) {
+    return false;
+  }
+
+  if (m_queueId != mq.m_queueId) {
+    return false;
+  }
+
+  if (m_topic != mq.m_topic) {
+    return false;
+  }
+
+  return true;
+}
+
+int MessageQueue::compareTo(const MessageQueue& mq) const {
+  int result = m_topic.compare(mq.m_topic);
+  if (result != 0) {
+    return result;
+  }
+
+  result = m_brokerName.compare(mq.m_brokerName);
+  if (result != 0) {
+    return result;
+  }
+
+  return m_queueId - mq.m_queueId;
+}
+
+bool MessageQueue::operator<(const MessageQueue& mq) const {
+  return compareTo(mq) < 0;
+}
+
+Json::Value MessageQueue::toJson() const {
+  Json::Value outJson;
+  outJson["topic"] = m_topic;
+  outJson["brokerName"] = m_brokerName;
+  outJson["queueId"] = m_queueId;
+  return outJson;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/MessageQueue.h b/rocketmq-cpp/src/protocol/MessageQueue.h
new file mode 100755
index 0000000..0d47bf8
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/MessageQueue.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGEQUEUE_H__
+#define __MESSAGEQUEUE_H__
+
+#include <string>
+#include "json/json.h"
+
+namespace rocketmq {
+//<!************************************************************************/
+//<!* MQ(T,B,ID);
+//<!************************************************************************/
+class MessageQueue {
+ public:
+  MessageQueue();
+  MessageQueue(const std::string& topic, const std::string& brokerName,
+               int queueId);
+  MessageQueue(const MessageQueue& other);
+  MessageQueue& operator=(const MessageQueue& other);
+
+  std::string getTopic() const;
+  void setTopic(const std::string& topic);
+
+  std::string getBrokerName() const;
+  void setBrokerName(const std::string& brokerName);
+
+  int getQueueId() const;
+  void setQueueId(int queueId);
+
+  bool operator==(const MessageQueue& mq) const;
+  bool operator<(const MessageQueue& mq) const;
+  int compareTo(const MessageQueue& mq) const;
+  Json::Value toJson() const;
+
+ private:
+  std::string m_topic;
+  std::string m_brokerName;
+  int m_queueId;
+};
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/ProcessQueueInfo.h b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
new file mode 100644
index 0000000..d7493a5
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/ProcessQueueInfo.h
@@ -0,0 +1,86 @@
+#ifndef __PROCESSQUEUEINFO_H__
+#define __PROCESSQUEUEINFO_H__
+
+#include "UtilAll.h"
+#include "json/json.h"
+
+namespace rocketmq {
+class ProcessQueueInfo {
+ public:
+  ProcessQueueInfo() {
+    commitOffset = 0;
+    cachedMsgMinOffset = 0;
+    cachedMsgMaxOffset = 0;
+    cachedMsgCount = 0;
+    transactionMsgMinOffset = 0;
+    transactionMsgMaxOffset = 0;
+    transactionMsgCount = 0;
+    locked = false;
+    tryUnlockTimes = 0;
+    lastLockTimestamp = 123;
+    droped = false;
+    lastPullTimestamp = 0;
+    lastConsumeTimestamp = 0;
+  }
+  virtual ~ProcessQueueInfo() {}
+
+ public:
+  const uint64 getCommitOffset() const { return commitOffset; }
+
+  void setCommitOffset(uint64 input_commitOffset) {
+    commitOffset = input_commitOffset;
+  }
+
+  void setLocked(bool in_locked) { locked = in_locked; }
+
+  const bool isLocked() const { return locked; }
+
+  void setDroped(bool in_dropped) { droped = in_dropped; }
+
+  const bool isDroped() const { return droped; }
+
+  Json::Value toJson() const {
+    Json::Value outJson;
+    outJson["commitOffset"] = (UtilAll::to_string(commitOffset)).c_str();
+    outJson["cachedMsgMinOffset"] =
+        (UtilAll::to_string(cachedMsgMinOffset)).c_str();
+    outJson["cachedMsgMaxOffset"] =
+        (UtilAll::to_string(cachedMsgMaxOffset)).c_str();
+    outJson["cachedMsgCount"] = (int)(cachedMsgCount);
+    outJson["transactionMsgMinOffset"] =
+        (UtilAll::to_string(transactionMsgMinOffset)).c_str();
+    outJson["transactionMsgMaxOffset"] =
+        (UtilAll::to_string(transactionMsgMaxOffset)).c_str();
+    outJson["transactionMsgCount"] = (int)(transactionMsgCount);
+    outJson["locked"] = (locked);
+    outJson["tryUnlockTimes"] = (int)(tryUnlockTimes);
+    outJson["lastLockTimestamp"] =
+        (UtilAll::to_string(lastLockTimestamp)).c_str();
+    outJson["droped"] = (droped);
+    outJson["lastPullTimestamp"] =
+        (UtilAll::to_string(lastPullTimestamp)).c_str();
+    outJson["lastConsumeTimestamp"] =
+        (UtilAll::to_string(lastConsumeTimestamp)).c_str();
+
+    return outJson;
+  }
+
+ public:
+  uint64 commitOffset;
+  uint64 cachedMsgMinOffset;
+  uint64 cachedMsgMaxOffset;
+  int cachedMsgCount;
+  uint64 transactionMsgMinOffset;
+  uint64 transactionMsgMaxOffset;
+  int transactionMsgCount;
+  bool locked;
+  int tryUnlockTimes;
+  uint64 lastLockTimestamp;
+
+  bool droped;
+  uint64 lastPullTimestamp;
+  uint64 lastConsumeTimestamp;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.cpp b/rocketmq-cpp/src/protocol/RemotingCommand.cpp
new file mode 100644
index 0000000..ff6e53e
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingCommand.cpp
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "RemotingCommand.h"
+#include "ByteOrder.h"
+#include "Logging.h"
+#include "MQProtos.h"
+#include "MQVersion.h"
+#include "SessionCredentials.h"
+
+namespace rocketmq {
+boost::atomic<int> RemotingCommand::s_seqNumber;
+boost::mutex RemotingCommand::m_clock;
+//<!************************************************************************
+RemotingCommand::RemotingCommand(int code,
+                                 CommandHeader* pExtHeader /* = NULL */)
+    : m_code(code),
+      m_language("CPP"),
+      m_version(MQVersion::s_CurrentVersion),
+      m_flag(0),
+      m_remark(""),
+      m_pExtHeader(pExtHeader) {
+  boost::lock_guard<boost::mutex> lock(m_clock);
+  m_opaque = (s_seqNumber.load(boost::memory_order_acquire)) %
+             (numeric_limits<int>::max());
+  s_seqNumber.store(m_opaque, boost::memory_order_release);
+  ++s_seqNumber;
+}
+
+RemotingCommand::RemotingCommand(int code, string language, int version,
+                                 int opaque, int flag, string remark,
+                                 CommandHeader* pExtHeader)
+    : m_code(code),
+      m_language(language),
+      m_version(version),
+      m_opaque(opaque),
+      m_flag(flag),
+      m_remark(remark),
+      m_pExtHeader(pExtHeader) {}
+
+RemotingCommand::~RemotingCommand() { m_pExtHeader = NULL; }
+
+void RemotingCommand::Encode() {
+  Json::Value root;
+  root["code"] = m_code;
+  root["language"] = "CPP";
+  root["version"] = m_version;
+  root["opaque"] = m_opaque;
+  root["flag"] = m_flag;
+  root["remark"] = m_remark;
+
+  if (m_pExtHeader) {
+    Json::Value extJson;
+    m_pExtHeader->Encode(extJson);
+
+    extJson[SessionCredentials::Signature] =
+        m_extFields[SessionCredentials::Signature];
+    extJson[SessionCredentials::AccessKey] =
+        m_extFields[SessionCredentials::AccessKey];
+    extJson[SessionCredentials::ONSChannelKey] =
+        m_extFields[SessionCredentials::ONSChannelKey];
+
+    root["extFields"] = extJson;
+  } else {  // for heartbeat
+    Json::Value extJson;
+    extJson[SessionCredentials::Signature] =
+        m_extFields[SessionCredentials::Signature];
+    extJson[SessionCredentials::AccessKey] =
+        m_extFields[SessionCredentials::AccessKey];
+    extJson[SessionCredentials::ONSChannelKey] =
+        m_extFields[SessionCredentials::ONSChannelKey];
+    root["extFields"] = extJson;
+  }
+
+  Json::FastWriter fastwrite;
+  string data = fastwrite.write(root);
+
+  uint32 headLen = data.size();
+  uint32 totalLen = 4 + headLen + m_body.getSize();
+
+  uint32 messageHeader[2];
+  messageHeader[0] = ByteOrder::swapIfLittleEndian(totalLen);
+  messageHeader[1] = ByteOrder::swapIfLittleEndian(headLen);
+
+  //<!include self 4 bytes, see : doc/protocol.txt;
+  m_head.setSize(4 + 4 + headLen);
+  m_head.copyFrom(messageHeader, 0, sizeof(messageHeader));
+  m_head.copyFrom(data.c_str(), sizeof(messageHeader), headLen);
+}
+
+const MemoryBlock* RemotingCommand::GetHead() const { return &m_head; }
+
+const MemoryBlock* RemotingCommand::GetBody() const { return &m_body; }
+
+void RemotingCommand::SetBody(const char* pData, int len) {
+  m_body.reset();
+  m_body.setSize(len);
+  m_body.copyFrom(pData, 0, len);
+}
+
+RemotingCommand* RemotingCommand::Decode(const MemoryBlock& mem) {
+  //<!decode 1 bytes,4+head+body
+  uint32 messageHeader[1];
+  mem.copyTo(messageHeader, 0, sizeof(messageHeader));
+  int totalLen = mem.getSize();
+  int headLen = ByteOrder::swapIfLittleEndian(messageHeader[0]);
+  int bodyLen = totalLen - 4 - headLen;
+
+  //<!decode header;
+  const char* const pData = static_cast<const char*>(mem.getData());
+  Json::Reader reader;
+  Json::Value object;
+  const char* begin = pData + 4;
+  const char* end = pData + 4 + headLen;
+
+  if (!reader.parse(begin, end, object)) {
+    THROW_MQEXCEPTION(MQClientException, "conn't parse json", -1);
+  }
+
+  int code = object["code"].asInt();
+
+  string language = object["language"].asString();
+  int version = object["version"].asInt();
+  int opaque = object["opaque"].asInt();
+  int flag = object["flag"].asInt();
+  Json::Value v = object["remark"];
+  string remark = "";
+  if (!v.isNull()) {
+    remark = object["remark"].asString();
+  }
+  LOG_DEBUG("code:%d, remark:%s, version:%d, opaque:%d, flag:%d, remark:%s, headLen:%d, bodyLen:%d ",
+            code, language.c_str(), version, opaque, flag, remark.c_str(), headLen, bodyLen);
+  RemotingCommand* cmd =
+      new RemotingCommand(code, language, version, opaque, flag, remark, NULL);
+  cmd->setParsedJson(object);
+  if (bodyLen > 0) {
+    cmd->SetBody(pData + 4 + headLen, bodyLen);
+  }
+  return cmd;
+}
+
+void RemotingCommand::markResponseType() {
+  int bits = 1 << RPC_TYPE;
+  m_flag |= bits;
+}
+
+bool RemotingCommand::isResponseType() {
+  int bits = 1 << RPC_TYPE;
+  return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::markOnewayRPC() {
+  int bits = 1 << RPC_ONEWAY;
+  m_flag |= bits;
+}
+
+bool RemotingCommand::isOnewayRPC() {
+  int bits = 1 << RPC_ONEWAY;
+  return (m_flag & bits) == bits;
+}
+
+void RemotingCommand::setOpaque(const int opa) { m_opaque = opa; }
+
+void RemotingCommand::SetExtHeader(int code) {
+  try {
+    Json::Value ext = m_parsedJson["extFields"];
+    if (!ext.isNull()) {
+      m_pExtHeader = NULL;
+      switch (code) {
+        case SEND_MESSAGE:
+          m_pExtHeader.reset(SendMessageResponseHeader::Decode(ext));
+          break;
+        case PULL_MESSAGE:
+          m_pExtHeader.reset(PullMessageResponseHeader::Decode(ext));
+          break;
+        case GET_MIN_OFFSET:
+          m_pExtHeader.reset(GetMinOffsetResponseHeader::Decode(ext));
+          break;
+        case GET_MAX_OFFSET:
+          m_pExtHeader.reset(GetMaxOffsetResponseHeader::Decode(ext));
+          break;
+        case SEARCH_OFFSET_BY_TIMESTAMP:
+          m_pExtHeader.reset(SearchOffsetResponseHeader::Decode(ext));
+          break;
+        case GET_EARLIEST_MSG_STORETIME:
+          m_pExtHeader.reset(
+              GetEarliestMsgStoretimeResponseHeader::Decode(ext));
+          break;
+        case QUERY_CONSUMER_OFFSET:
+          m_pExtHeader.reset(QueryConsumerOffsetResponseHeader::Decode(ext));
+          break;
+        case RESET_CONSUMER_CLIENT_OFFSET:
+          m_pExtHeader.reset(ResetOffsetRequestHeader::Decode(ext));
+          break;
+        case GET_CONSUMER_RUNNING_INFO:
+          m_pExtHeader.reset(GetConsumerRunningInfoRequestHeader::Decode(ext));
+          break;
+        case NOTIFY_CONSUMER_IDS_CHANGED:
+          m_pExtHeader.reset(
+              NotifyConsumerIdsChangedRequestHeader::Decode(ext));
+        default:
+          break;
+      }
+    }
+  } catch (MQException& e) {
+    LOG_ERROR("set response head error");
+  }
+}
+
+void RemotingCommand::setCode(int code) { m_code = code; }
+
+int RemotingCommand::getCode() const { return m_code; }
+
+int RemotingCommand::getOpaque() const { return m_opaque; }
+
+string RemotingCommand::getRemark() const { return m_remark; }
+
+void RemotingCommand::setRemark(string mark) { m_remark = mark; }
+
+CommandHeader* RemotingCommand::getCommandHeader() const {
+  return m_pExtHeader.get();
+}
+
+void RemotingCommand::setParsedJson(Json::Value json) {
+  m_parsedJson = json;
+}
+
+const int RemotingCommand::getFlag() const { return m_flag; }
+
+const int RemotingCommand::getVersion() const { return m_version; }
+
+void RemotingCommand::setMsgBody(const string& body) { m_msgBody = body; }
+
+string RemotingCommand::getMsgBody() const { return m_msgBody; }
+
+void RemotingCommand::addExtField(const string& key, const string& value) {
+  m_extFields[key] = value;
+}
+
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingCommand.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingCommand.h b/rocketmq-cpp/src/protocol/RemotingCommand.h
new file mode 100755
index 0000000..633a511
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingCommand.h
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTINGCOMMAND_H__
+#define __REMOTINGCOMMAND_H__
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <memory>
+#include <sstream>
+#include "CommandHeader.h"
+#include "dataBlock.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+const int RPC_TYPE = 0;    // 0, REQUEST_COMMAND // 1, RESPONSE_COMMAND;
+const int RPC_ONEWAY = 1;  // 0, RPC // 1, Oneway;
+//<!***************************************************************************
+class RemotingCommand {
+ public:
+  RemotingCommand(int code, CommandHeader* pCustomHeader = NULL);
+  RemotingCommand(int code, string language, int version, int opaque, int flag,
+                  string remark, CommandHeader* pCustomHeader);
+  virtual ~RemotingCommand();
+
+  const MemoryBlock* GetHead() const;
+  const MemoryBlock* GetBody() const;
+
+  void SetBody(const char* pData, int len);
+  void setOpaque(const int opa);
+  void SetExtHeader(int code);
+
+  void setCode(int code);
+  int getCode() const;
+  int getOpaque() const;
+  void setRemark(string mark);
+  string getRemark() const;
+  void markResponseType();
+  bool isResponseType();
+  void markOnewayRPC();
+  bool isOnewayRPC();
+  void setParsedJson(Json::Value json);
+
+  CommandHeader* getCommandHeader() const;
+  const int getFlag() const;
+  const int getVersion() const;
+
+  void addExtField(const string& key, const string& value);
+  string getMsgBody() const;
+  void setMsgBody(const string& body);
+
+ public:
+  void Encode();
+  static RemotingCommand* Decode(const MemoryBlock& mem);
+
+ private:
+  int m_code;
+  string m_language;
+  int m_version;
+  int m_opaque;
+  int m_flag;
+  string m_remark;
+  string m_msgBody;
+  map<string, string> m_extFields;
+
+  static boost::mutex m_clock;
+  MemoryBlock m_head;
+  MemoryBlock m_body;
+  //<!save here
+  Json::Value m_parsedJson;
+  static boost::atomic<int> s_seqNumber;
+  unique_ptr<CommandHeader> m_pExtHeader;
+};
+
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/RemotingSerializable.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/RemotingSerializable.h b/rocketmq-cpp/src/protocol/RemotingSerializable.h
new file mode 100755
index 0000000..812a892
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/RemotingSerializable.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __REMOTINGSERIALIZABLE_H__
+#define __REMOTINGSERIALIZABLE_H__
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class RemotingSerializable {
+ public:
+  virtual ~RemotingSerializable(){};
+  virtual void Encode(std::string& outData) = 0;
+};
+
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicList.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/TopicList.h b/rocketmq-cpp/src/protocol/TopicList.h
new file mode 100755
index 0000000..d8d14a7
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/TopicList.h
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICLIST_H__
+#define __TOPICLIST_H__
+#include <string>
+#include <vector>
+#include "dataBlock.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+class TopicList {
+ public:
+  static TopicList* Decode(const MemoryBlock* mem) { return new TopicList(); }
+
+ private:
+  vector<string> m_topicList;
+};
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/protocol/TopicRouteData.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/protocol/TopicRouteData.h b/rocketmq-cpp/src/protocol/TopicRouteData.h
new file mode 100755
index 0000000..ec8f842
--- /dev/null
+++ b/rocketmq-cpp/src/protocol/TopicRouteData.h
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICROUTEDATA_H__
+#define __TOPICROUTEDATA_H__
+#include <algorithm>
+#include "Logging.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+#include "json/json.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+struct QueueData {
+  string brokerName;
+  int readQueueNums;
+  int writeQueueNums;
+  int perm;
+
+  bool operator<(const QueueData& other) const {
+    return brokerName < other.brokerName;
+  }
+
+  bool operator==(const QueueData& other) const {
+    if (brokerName == other.brokerName &&
+        readQueueNums == other.readQueueNums &&
+        writeQueueNums == other.writeQueueNums && perm == other.perm) {
+      return true;
+    }
+    return false;
+  }
+};
+
+//<!***************************************************************************
+struct BrokerData {
+  string brokerName;
+  map<int, string> brokerAddrs;  //<!0:master,1,2.. slave
+
+  bool operator<(const BrokerData& other) const {
+    return brokerName < other.brokerName;
+  }
+
+  bool operator==(const BrokerData& other) const {
+    if (brokerName == other.brokerName && brokerAddrs == other.brokerAddrs) {
+      return true;
+    }
+    return false;
+  }
+};
+
+//<!************************************************************************/
+class TopicRouteData {
+ public:
+  virtual ~TopicRouteData() {
+    m_brokerDatas.clear();
+    m_queueDatas.clear();
+  }
+
+  static TopicRouteData* Decode(const MemoryBlock* mem) {
+    //<!see doc/TopicRouteData.json;
+    const char* const pData = static_cast<const char*>(mem->getData());
+    string data(pData, mem->getSize());
+    
+    Json::Value root;
+    Json::CharReaderBuilder charReaderBuilder;
+    charReaderBuilder.settings_["allowNumericKeys"] = true;
+    unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
+    const char* begin = pData;
+    const char* end = pData + mem->getSize(); 
+    string errs;
+    if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
+      LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(), root.isObject());
+      return NULL;
+    }
+
+    TopicRouteData* trd = new TopicRouteData();
+    trd->setOrderTopicConf(root["orderTopicConf"].asString());
+
+    Json::Value qds = root["queueDatas"];
+    for (unsigned int i = 0; i < qds.size(); i++) {
+      QueueData d;
+      Json::Value qd = qds[i];
+      d.brokerName = qd["brokerName"].asString();
+      d.readQueueNums = qd["readQueueNums"].asInt();
+      d.writeQueueNums = qd["writeQueueNums"].asInt();
+      d.perm = qd["perm"].asInt();
+
+      trd->getQueueDatas().push_back(d);
+    }
+
+    sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
+
+    Json::Value bds = root["brokerDatas"];
+    for (unsigned int i = 0; i < bds.size(); i++) {
+      BrokerData d;
+      Json::Value bd = bds[i];
+      d.brokerName = bd["brokerName"].asString();
+
+      LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
+
+      Json::Value bas = bd["brokerAddrs"];
+      Json::Value::Members mbs = bas.getMemberNames();
+      for (size_t i = 0; i < mbs.size(); i++) {
+        string key = mbs.at(i);
+        LOG_DEBUG("brokerid:%s,brokerAddr:%s", key.c_str(),
+                  bas[key].asString().c_str());
+        d.brokerAddrs[atoi(key.c_str())] = bas[key].asString();
+      }
+
+      trd->getBrokerDatas().push_back(d);
+    }
+
+    sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+
+    return trd;
+  }
+
+  string selectBrokerAddr() {
+    vector<BrokerData>::iterator it = m_brokerDatas.begin();
+    for (; it != m_brokerDatas.end(); ++it) {
+      map<int, string>::iterator it1 = (*it).brokerAddrs.find(MASTER_ID);
+      if (it1 != (*it).brokerAddrs.end()) {
+        return it1->second;
+      }
+    }
+    return "";
+  }
+
+
+  vector<QueueData>& getQueueDatas() { return m_queueDatas; }
+
+  vector<BrokerData>& getBrokerDatas() { return m_brokerDatas; }
+
+  const string& getOrderTopicConf() const { return m_orderTopicConf; }
+
+  void setOrderTopicConf(const string& orderTopicConf) {
+    m_orderTopicConf = orderTopicConf;
+  }
+
+  bool operator==(const TopicRouteData& other) const {
+    if (m_brokerDatas != other.m_brokerDatas) {
+      return false;
+    }
+
+    if (m_orderTopicConf != other.m_orderTopicConf) {
+      return false;
+    }
+
+    if (m_queueDatas != other.m_queueDatas) {
+      return false;
+    }
+    return true;
+  }
+
+ public:
+ private:
+  string m_orderTopicConf;
+  vector<QueueData> m_queueDatas;
+  vector<BrokerData> m_brokerDatas;
+};
+
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
new file mode 100755
index 0000000..ba1a035
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/batch_descriptor.h
@@ -0,0 +1,70 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_BATCH_DESCRIPTOR_H_  // NOLINT
+#define DISRUPTOR_BATCH_DESCRIPTOR_H_  // NOLINT
+
+#include "sequence.h"
+
+namespace rocketmq {
+
+// Used to record the batch of sequences claimed via {@link Sequencer}.
+class BatchDescriptor {
+ public:
+    // Create a holder for tracking a batch of claimed sequences in a
+    // {@link Sequencer}
+    //
+    // @param size of the batch to claim.
+    BatchDescriptor(int size) :
+        size_(size),
+        end_(kInitialCursorValue) {}
+
+    // Get the size of the batch
+    int size() const { return size_; }
+
+    // Get the end sequence of a batch.
+    //
+    // @return the end sequence in the batch.
+    int64_t end() const { return end_; }
+
+    // Set the end sequence of a batch.
+    //
+    // @param end sequence in the batch.
+    void set_end(int64_t end) { end_ = end; }
+
+
+    // Get the starting sequence of the batch.
+    //
+    // @return starting sequence in the batch.
+    int64_t Start() const { return end_ - size_ + 1L; }
+
+ private:
+    int size_;
+    int64_t end_;
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_SEQUENCE_BATCH_H_  NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/claim_strategy.h b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
new file mode 100755
index 0000000..0f3263a
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/claim_strategy.h
@@ -0,0 +1,231 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT
+#define DISRUPTOR_CLAIM_STRATEGY_H_ // NOLINT
+
+#include <boost/thread.hpp>
+#include <boost/noncopyable.hpp>
+#include <vector>
+
+#include "interface.h"
+
+namespace rocketmq {
+
+enum ClaimStrategyOption {
+    kSingleThreadedStrategy,
+    kMultiThreadedStrategy
+};
+
+// Optimised strategy can be used when there is a single publisher thread
+// claiming {@link AbstractEvent}s.
+class SingleThreadedStrategy :public noncopyable,  public ClaimStrategyInterface {
+ public:
+    SingleThreadedStrategy(const int& buffer_size) :
+        buffer_size_(buffer_size),
+        sequence_(kInitialCursorValue),
+        min_gating_sequence_(kInitialCursorValue) {}
+
+    virtual int64_t IncrementAndGet(
+            const std::vector<Sequence*>& dependent_sequences) {
+        int64_t next_sequence = sequence_.IncrementAndGet(1L);
+        WaitForFreeSlotAt(next_sequence, dependent_sequences);
+        return next_sequence;
+    }
+
+    virtual int64_t IncrementAndGet(const int& delta,
+            const std::vector<Sequence*>& dependent_sequences) {
+        int64_t next_sequence = sequence_.IncrementAndGet(delta);
+        WaitForFreeSlotAt(next_sequence, dependent_sequences);
+        return next_sequence;
+    }
+
+    virtual bool HasAvalaibleCapacity(
+            const std::vector<Sequence*>& dependent_sequences) {
+        int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+        if (wrap_point > min_gating_sequence_.sequence()) {
+            int64_t min_sequence = GetMinimumSequence(dependent_sequences);
+            min_gating_sequence_.set_sequence(min_sequence);
+            if (wrap_point > min_sequence)
+                return false;
+        }
+        return true;
+    }
+
+    virtual void SetSequence(const int64_t& sequence,
+            const std::vector<Sequence*>& dependent_sequences) {
+        sequence_.set_sequence(sequence);
+        WaitForFreeSlotAt(sequence, dependent_sequences);
+    }
+
+    virtual void SerialisePublishing(const int64_t& sequence,
+                                     const Sequence& cursor,
+                                     const int64_t& batch_size) {}
+
+ private:
+    SingleThreadedStrategy();
+
+    void WaitForFreeSlotAt(const int64_t& sequence,
+            const std::vector<Sequence*>& dependent_sequences) {
+        int64_t wrap_point = sequence - buffer_size_;
+        if (wrap_point > min_gating_sequence_.sequence()) {
+            int64_t min_sequence;
+            while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+                boost::this_thread::yield();
+            }
+        }
+    }
+
+    const int buffer_size_;
+    PaddedLong sequence_;
+    PaddedLong min_gating_sequence_;
+
+};
+
+// Strategy to be used when there are multiple publisher threads claiming
+// {@link AbstractEvent}s.
+/*
+class MultiThreadedStrategy :  public ClaimStrategyInterface {
+ public:
+    MultiThreadedStrategy(const int& buffer_size) :
+        buffer_size_(buffer_size),
+        sequence_(kInitialCursorValue),
+        min_processor_sequence_(kInitialCursorValue) {}
+
+    virtual int64_t IncrementAndGet(
+            const std::vector<Sequence*>& dependent_sequences) {
+        WaitForCapacity(dependent_sequences, min_gating_sequence_local_);
+        int64_t next_sequence = sequence_.IncrementAndGet();
+        WaitForFreeSlotAt(next_sequence,
+                          dependent_sequences,
+                          min_gating_sequence_local_);
+        return next_sequence;
+    }
+
+    virtual int64_t IncrementAndGet(const int& delta,
+            const std::vector<Sequence*>& dependent_sequences) {
+        int64_t next_sequence = sequence_.IncrementAndGet(delta);
+        WaitForFreeSlotAt(next_sequence,
+                          dependent_sequences,
+                          min_gating_sequence_local_);
+        return next_sequence;
+    }
+    virtual void SetSequence(const int64_t& sequence,
+            const std::vector<Sequence*>& dependent_sequences) {
+        sequence_.set_sequence(sequence);
+        WaitForFreeSlotAt(sequence,
+                          dependent_sequences,
+                          min_gating_sequence_local_);
+    }
+
+    virtual bool HasAvalaibleCapacity(
+            const std::vector<Sequence*>& dependent_sequences) {
+        const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+        if (wrap_point > min_gating_sequence_local_.sequence()) {
+            int64_t min_sequence = GetMinimumSequence(dependent_sequences);
+            min_gating_sequence_local_.set_sequence(min_sequence);
+            if (wrap_point > min_sequence)
+                return false;
+        }
+        return true;
+    }
+
+    virtual void SerialisePublishing(const Sequence& cursor,
+                                     const int64_t& sequence,
+                                     const int64_t& batch_size) {
+        int64_t expected_sequence = sequence - batch_size;
+        int counter = retries;
+
+        while (expected_sequence != cursor.sequence()) {
+            if (0 == --counter) {
+                counter = retries;
+                std::this_thread::yield();
+            }
+        }
+    }
+
+ private:
+    // Methods
+    void WaitForCapacity(const std::vector<Sequence*>& dependent_sequences,
+                         const MutableLong& min_gating_sequence) {
+        const int64_t wrap_point = sequence_.sequence() + 1L - buffer_size_;
+        if (wrap_point > min_gating_sequence.sequence()) {
+            int counter = retries;
+            int64_t min_sequence;
+            while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+                counter = ApplyBackPressure(counter);
+            }
+            min_gating_sequence.set_sequence(min_sequence);
+        }
+    }
+
+    void WaitForFreeSlotAt(const int64_t& sequence,
+                           const std::vector<Sequence*>& dependent_sequences,
+                           const MutableLong& min_gating_sequence) {
+        const int64_t wrap_point = sequence - buffer_size_;
+        if (wrap_point > min_gating_sequence.sequence()) {
+            int64_t min_sequence;
+            while (wrap_point > (min_sequence = GetMinimumSequence(dependent_sequences))) {
+                std::this_thread::yield();
+            }
+            min_gating_sequence.set_sequence(min_sequence);
+        }
+    }
+
+    int ApplyBackPressure(int counter) {
+        if (0 != counter) {
+            --counter;
+            std::this_thread::yield();
+        } else {
+            std::this_thread::sleep_for(std::chrono::milliseconds(1));
+        }
+
+        return counter;
+    }
+
+    const int buffer_size_;
+    PaddedSequence sequence_;
+    thread_local PaddedLong min_gating_sequence_local_;
+
+    const int retries = 100;
+
+};
+*/
+
+ClaimStrategyInterface* CreateClaimStrategy(ClaimStrategyOption option,
+                                            const int& buffer_size) {
+    switch (option) {
+        case kSingleThreadedStrategy:
+            return new SingleThreadedStrategy(buffer_size);
+        // case kMultiThreadedStrategy:
+        //     return new MultiThreadedStrategy(buffer_size);
+        default:
+            return NULL;
+    }
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_CLAIM_STRATEGY_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_processor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/event_processor.h b/rocketmq-cpp/src/thread/disruptor/event_processor.h
new file mode 100755
index 0000000..fb62812
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/event_processor.h
@@ -0,0 +1,130 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT
+#define DISRUPTOR_EVENT_PROCESSOR_H_ // NOLINT
+
+#include <stdexcept>
+#include "ring_buffer.h"
+
+namespace rocketmq {
+
+template <typename T>
+class NoOpEventProcessor : public EventProcessorInterface<T> {
+ public:
+    NoOpEventProcessor(RingBuffer<T>* ring_buffer) :
+        ring_buffer_(ring_buffer) { }
+
+    virtual Sequence* GetSequence() {
+        return ring_buffer_->GetSequencePtr();
+    }
+
+    virtual void Halt() {}
+
+    virtual void Run() {}
+
+ private:
+    RingBuffer<T>* ring_buffer_;
+};
+
+template <typename T>
+class BatchEventProcessor : public boost::noncopyable, public EventProcessorInterface<T> {
+ public:
+    BatchEventProcessor(RingBuffer<T>* ring_buffer,
+                        SequenceBarrierInterface* sequence_barrier,
+                        EventHandlerInterface<T>* event_handler,
+                        ExceptionHandlerInterface<T>* exception_handler) :
+            running_(false),
+            ring_buffer_(ring_buffer),
+            sequence_barrier_(sequence_barrier),
+            event_handler_(event_handler),
+            exception_handler_(exception_handler) {}
+
+
+    virtual Sequence* GetSequence() { return &sequence_; }
+
+    virtual void Halt() {
+        running_.store(false);
+        sequence_barrier_->Alert();
+    }
+
+    virtual void Run() {
+        if (running_.load())
+         {
+            printf("Thread is already running\r\n");
+         }
+        running_.store(true);
+        sequence_barrier_->ClearAlert();
+        event_handler_->OnStart();
+
+        T* event = NULL;
+        int64_t next_sequence = sequence_.sequence() + 1L;
+
+        while (true) {
+            try {
+                int64_t avalaible_sequence = \
+                    sequence_barrier_->WaitFor(next_sequence, 300*1000);//wait 300 milliseconds to avoid taskThread blocking on BlockingStrategy::WaitFor when shutdown
+                //metaq::LOG_INFO("avalaible_sequence:%d, next_sequence:%d", avalaible_sequence,next_sequence);
+                while (next_sequence <= avalaible_sequence) {
+                    event = ring_buffer_->Get(next_sequence);
+                    event_handler_->OnEvent(next_sequence,
+                            next_sequence == avalaible_sequence, event);
+                    next_sequence++;
+                }
+
+                sequence_.set_sequence(next_sequence - 1L);
+            } catch(const AlertException& e) {
+                //metaq::LOG_INFO("catch alertException");
+                if (!running_.load())
+                    break;
+            } catch(const std::exception& e) {
+                //metaq::LOG_ERROR("catch stdException");
+                exception_handler_->Handle(e, next_sequence, event);
+                sequence_.set_sequence(next_sequence);
+                next_sequence++;
+            }
+        }
+        //metaq::LOG_INFO("BatchEventProcessor shutdown");
+        event_handler_->OnShutdown();
+        running_.store(false);
+    }
+
+    void operator()() { Run(); }
+
+ private:
+    boost::atomic<bool> running_;
+    Sequence sequence_;
+
+    RingBuffer<T>* ring_buffer_;
+    SequenceBarrierInterface* sequence_barrier_;
+    EventHandlerInterface<T>* event_handler_;
+    ExceptionHandlerInterface<T>* exception_handler_;
+
+};
+
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_EVENT_PROCESSOR_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/event_publisher.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/event_publisher.h b/rocketmq-cpp/src/thread/disruptor/event_publisher.h
new file mode 100755
index 0000000..ae0efd9
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/event_publisher.h
@@ -0,0 +1,50 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT
+#define DISRUPTOR_EVENT_PUBLISHER_H_ // NOLINT
+
+#include "ring_buffer.h"
+
+namespace rocketmq {
+
+template<typename T>
+class EventPublisher {
+ public:
+    EventPublisher(RingBuffer<T>* ring_buffer) : ring_buffer_(ring_buffer) {}
+
+    void PublishEvent(EventTranslatorInterface<T>* translator) {
+        int64_t sequence = ring_buffer_->Next();
+        translator->TranslateTo(sequence, ring_buffer_->Get(sequence));
+        ring_buffer_->Publish(sequence);
+    }
+
+ private:
+    RingBuffer<T>* ring_buffer_;
+};
+
+};  // namespace rocketmq
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exception_handler.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/exception_handler.h b/rocketmq-cpp/src/thread/disruptor/exception_handler.h
new file mode 100755
index 0000000..e7979a0
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/exception_handler.h
@@ -0,0 +1,59 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EXCEPTION_HANDLER_H_  // NOLINT
+#define DISRUPTOR_EXCEPTION_HANDLER_H_  // NOLINT
+
+#include <exception>
+
+#include "interface.h"
+
+namespace rocketmq {
+
+template<typename T>
+class IgnoreExceptionHandler: public ExceptionHandlerInterface<T> {
+ public:
+    virtual void Handle(const std::exception& exception,
+                         const int64_t& sequence,
+                         T* event) {
+        // do nothing with the exception.
+        ;
+    }
+};
+
+template<typename T>
+class FatalExceptionHandler: public ExceptionHandlerInterface<T> {
+ public:
+    virtual void Handle(const std::exception& exception,
+                         const int64_t& sequence,
+                         T* event) {
+        // rethrow the exception
+        throw exception;
+    }
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_EXCEPTION_HANDLER_H_  NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/exceptions.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/exceptions.h b/rocketmq-cpp/src/thread/disruptor/exceptions.h
new file mode 100755
index 0000000..f968043
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/exceptions.h
@@ -0,0 +1,38 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_EXCEPTIONS_H_  // NOLINT
+#define DISRUPTOR_EXCEPTIONS_H_  // NOLINT
+
+#include <exception>
+
+namespace rocketmq {
+
+class AlertException : public std::exception {
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_EXCEPTIONS_H_  NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/interface.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/interface.h b/rocketmq-cpp/src/thread/disruptor/interface.h
new file mode 100755
index 0000000..0c07774
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/interface.h
@@ -0,0 +1,278 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_INTERFACE_H_ // NOLINT
+#define DISRUPTOR_INTERFACE_H_ // NOLINT
+
+#include <climits>
+#include <vector>
+
+#include "sequence.h"
+#include "batch_descriptor.h"
+
+namespace rocketmq {
+
+// Strategies employed for claiming the sequence of events in the
+// {@link Seqencer} by publishers.
+class ClaimStrategyInterface {
+ public:
+    // Is there available capacity in the buffer for the requested sequence.
+    //
+    // @param dependent_sequences to be checked for range.
+    // @return true if the buffer has capacity for the requested sequence.
+    virtual ~ClaimStrategyInterface() {}
+    virtual bool HasAvalaibleCapacity(
+        const std::vector<Sequence*>& dependent_sequences) = 0;
+
+    // Claim the next sequence in the {@link Sequencer}.
+    //
+    // @param dependent_sequences to be checked for range.
+    // @return the index to be used for the publishing.
+    virtual int64_t IncrementAndGet(
+            const std::vector<Sequence*>& dependent_sequences) = 0;
+
+    // Claim the next sequence in the {@link Sequencer}.
+    //
+    // @param delta to increment by.
+    // @param dependent_sequences to be checked for range.
+    // @return the index to be used for the publishing.
+    virtual int64_t IncrementAndGet(const int& delta,
+            const std::vector<Sequence*>& dependent_sequences) = 0;
+
+    // Set the current sequence value for claiming an event in the
+    // {@link Sequencer}.
+    //
+    // @param sequence to be set as the current value.
+    // @param dependent_sequences to be checked for range.
+    virtual void SetSequence(const int64_t& sequence,
+            const std::vector<Sequence*>& dependent_sequences) = 0;
+
+    // Serialise publishing in sequence.
+    //
+    // @param sequence to be applied.
+    // @param cursor to be serialise against.
+    // @param batch_size of the sequence.
+    virtual void SerialisePublishing(const int64_t& sequence,
+                                     const Sequence& cursor,
+                                     const int64_t& batch_size) = 0;
+};
+
+// Coordination barrier for tracking the cursor for publishers and sequence of
+// dependent {@link EventProcessor}s for processing a data structure.
+class SequenceBarrierInterface {
+ public:
+    // Wait for the given sequence to be available for consumption.
+    //
+    // @param sequence to wait for.
+    // @return the sequence up to which is available.
+    //
+    // @throws AlertException if a status change has occurred for the
+    // Disruptor.
+    virtual ~SequenceBarrierInterface(){}
+    virtual int64_t WaitFor(const int64_t& sequence) = 0;
+
+    // Wait for the given sequence to be available for consumption with a
+    // time out.
+    //
+    // @param sequence to wait for.
+    // @param timeout in microseconds.
+    // @return the sequence up to which is available.
+    //
+    // @throws AlertException if a status change has occurred for the
+    // Disruptor.
+    virtual int64_t WaitFor(const int64_t& sequence,
+                            const int64_t& timeout_micro) = 0;
+
+    // Delegate a call to the {@link Sequencer#getCursor()}
+    //
+    //  @return value of the cursor for entries that have been published.
+    virtual int64_t GetCursor() const = 0;
+
+    // The current alert status for the barrier.
+    //
+    // @return true if in alert otherwise false.
+    virtual bool IsAlerted() const = 0;
+
+    // Alert the {@link EventProcessor}s of a status change and stay in this
+    // status until cleared.
+    virtual void Alert() = 0;
+
+    // Clear the current alert status.
+    virtual void ClearAlert() = 0;
+
+    // Check if barrier is alerted, if so throws an AlertException
+    //
+    // @throws AlertException if barrier is alerted
+    virtual void CheckAlert() const = 0;
+};
+
+// Called by the {@link RingBuffer} to pre-populate all the events to fill the
+// RingBuffer.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventFactoryInterface {
+ public:
+    virtual ~EventFactoryInterface(){}
+     virtual T* NewInstance(const int& size) const = 0;
+};
+
+// Callback interface to be implemented for processing events as they become
+// available in the {@link RingBuffer}.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventHandlerInterface {
+ public:
+    // Called when a publisher has published an event to the {@link RingBuffer}
+    //
+    // @param event published to the {@link RingBuffer}
+    // @param sequence of the event being processed
+    // @param end_of_batch flag to indicate if this is the last event in a batch
+    // from the {@link RingBuffer}
+    //
+    // @throws Exception if the EventHandler would like the exception handled
+    // further up the chain.
+    virtual ~EventHandlerInterface(){}
+    virtual void OnEvent(const int64_t& sequence,
+                         const bool& end_of_batch,
+                         T* event)  = 0;
+
+    // Called once on thread start before processing the first event.
+    virtual void OnStart() = 0;
+
+    // Called once on thread stop just before shutdown.
+    virtual void OnShutdown() = 0;
+};
+
+// Implementations translate another data representations into events claimed
+// for the {@link RingBuffer}.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventTranslatorInterface {
+ public:
+     // Translate a data representation into fields set in given event
+     //
+     // @param event into which the data should be translated.
+     // @param sequence that is assigned to events.
+     // @return the resulting event after it has been translated.
+     virtual ~EventTranslatorInterface(){}
+     virtual T* TranslateTo(const int64_t& sequence, T* event) { return NULL;}
+};
+
+// EventProcessors wait for events to become available for consumption from
+// the {@link RingBuffer}. An event processor should be associated with a
+// thread.
+//
+// @param <T> event implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class EventProcessorInterface {
+ public:
+     // Get a pointer to the {@link Sequence} being used by this
+     // {@link EventProcessor}.
+     //
+     // @return pointer to the {@link Sequence} for this
+     // {@link EventProcessor}
+     virtual ~EventProcessorInterface(){}
+    virtual Sequence* GetSequence() = 0;
+
+    // Signal that this EventProcessor should stop when it has finished
+    // consuming at the next clean break.
+    // It will call {@link DependencyBarrier#alert()} to notify the thread to
+    // check status.
+    virtual void Halt() = 0;
+};
+
+// Callback handler for uncaught exception in the event processing cycle
+// of the {@link BatchEventProcessor}.
+//
+// @param <T> event type stored in the {@link RingBuffer}.
+template<typename T>
+class ExceptionHandlerInterface {
+ public:
+    // Strategy for handling uncaught exceptions when processing an event.
+    // If the strategy wishes to suspend further processing by the
+    // {@link BatchEventProcessor} then it should throw a std::runtime_error.
+    //
+    // @param exception that propagated from the {@link EventHandler}.
+    // @param sequence of the event which caused the exception.
+    // @param event being processed when the exception occured.
+    virtual ~ExceptionHandlerInterface(){}
+    virtual void Handle(const std::exception& exception,
+                        const int64_t& sequence,
+                        T* event) = 0;
+};
+
+// Strategy employed for making {@link EventProcessor}s wait on a cursor
+// {@link Sequence}.
+class WaitStrategyInterface: public boost::noncopyable {
+ public:
+    //  Wait for the given sequence to be available for consumption.
+    //
+    //  @param dependents further back the chain that must advance first.
+    //  @param cursor on which to wait.
+    //  @param barrier the consumer is waiting on.
+    //  @param sequence to be waited on.
+    //  @return the sequence that is available which may be greater than the
+    //  requested sequence.
+    //
+    //  @throws AlertException if the status of the Disruptor has changed.
+    virtual ~WaitStrategyInterface(){}
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence) = 0;
+
+    //  Wait for the given sequence to be available for consumption in a
+    //  {@link RingBuffer} with a timeout specified.
+    //
+    //  @param dependents further back the chain that must advance first
+    //  @param cursor on which to wait.
+    //  @param barrier the consumer is waiting on.
+    //  @param sequence to be waited on.
+    //  @param timeout value in micro seconds to abort after.
+    //  @return the sequence that is available which may be greater than the
+    //  requested sequence.
+    //
+    //  @throws AlertException if the status of the Disruptor has changed.
+    //  @throws InterruptedException if the thread is interrupted.
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t & sequence,
+                            const int64_t & timeout_micros) = 0;
+
+    // Signal those waiting that the cursor has advanced.
+    virtual void SignalAllWhenBlocking() = 0;
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_INTERFACE_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/ring_buffer.h b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
new file mode 100755
index 0000000..c7150f1
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/ring_buffer.h
@@ -0,0 +1,90 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_RING_BUFFER_H_ // NOLINT
+#define DISRUPTOR_RING_BUFFER_H_ // NOLINT
+
+#include <boost/array.hpp>
+#include <vector>
+
+#include "interface.h"
+#include "claim_strategy.h"
+#include "wait_strategy.h"
+#include "sequencer.h"
+#include "sequence_barrier.h"
+
+namespace rocketmq {
+
+// Ring based store of reusable entries containing the data representing an
+// event beign exchanged between publisher and {@link EventProcessor}s.
+//
+// @param <T> implementation storing the data for sharing during exchange
+// or parallel coordination of an event.
+template<typename T>
+class RingBuffer : public Sequencer {
+ public:
+    // Construct a RingBuffer with the full option set.
+    //
+    // @param event_factory to instance new entries for filling the RingBuffer.
+    // @param buffer_size of the RingBuffer, must be a power of 2.
+    // @param claim_strategy_option threading strategy for publishers claiming
+    // entries in the ring.
+    // @param wait_strategy_option waiting strategy employed by
+    // processors_to_track waiting in entries becoming available.
+    RingBuffer(EventFactoryInterface<T>* event_factory,
+               int buffer_size,
+               ClaimStrategyOption claim_strategy_option,
+               WaitStrategyOption wait_strategy_option) :
+            Sequencer(buffer_size,
+                      claim_strategy_option,
+                      wait_strategy_option),
+            buffer_size_(buffer_size),
+            mask_(buffer_size - 1),
+            events_(event_factory->NewInstance(buffer_size)) {
+    }
+
+    ~RingBuffer() {
+        delete[] events_;
+    }
+
+    // Get the event for a given sequence in the RingBuffer.
+    //
+    // @param sequence for the event
+    // @return event pointer at the specified sequence position.
+    T* Get(const int64_t& sequence) {
+        return &events_[sequence & mask_];
+    }
+
+ private:
+    // Members
+    int buffer_size_;
+    int mask_;
+    T* events_;
+
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequence.h b/rocketmq-cpp/src/thread/disruptor/sequence.h
new file mode 100755
index 0000000..f1396f3
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequence.h
@@ -0,0 +1,139 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef CACHE_LINE_SIZE_IN_BYTES // NOLINT
+#define CACHE_LINE_SIZE_IN_BYTES 64 // NOLINT
+#endif // NOLINT
+#define ATOMIC_SEQUENCE_PADDING_LENGTH \
+    (CACHE_LINE_SIZE_IN_BYTES - sizeof(boost::atomic<int64_t>))/8
+#define SEQUENCE_PADDING_LENGTH \
+    (CACHE_LINE_SIZE_IN_BYTES - sizeof(int64_t))/8
+
+#ifndef DISRUPTOR_SEQUENCE_H_ // NOLINT
+#define DISRUPTOR_SEQUENCE_H_ // NOLINT
+
+#include <boost/atomic.hpp>
+#include <boost/memory_order.hpp>
+#include <boost/noncopyable.hpp>
+#include <vector>
+#include <limits>
+using namespace boost;
+namespace rocketmq {
+
+const int64_t kInitialCursorValue = -1L;
+
+// Sequence counter.
+class Sequence:public noncopyable {
+ public:
+    // Construct a sequence counter that can be tracked across threads.
+    //
+    // @param initial_value for the counter.
+    Sequence(int64_t initial_value = kInitialCursorValue) :
+            value_(initial_value) {}
+
+    // Get the current value of the {@link Sequence}.
+    //
+    // @return the current value.
+    int64_t sequence() const { return value_.load(boost::memory_order_acquire); }
+
+    // Set the current value of the {@link Sequence}.
+    //
+    // @param the value to which the {@link Sequence} will be set.
+    void set_sequence(int64_t value) { value_.store(value, boost::memory_order_release); }
+
+    // Increment and return the value of the {@link Sequence}.
+    //
+    // @param increment the {@link Sequence}.
+    // @return the new value incremented.
+    int64_t IncrementAndGet(const int64_t& increment) {
+        return value_.fetch_add(increment, boost::memory_order_release) + increment;
+    }
+
+ private:
+    // members
+    boost::atomic<int64_t> value_;
+
+};
+
+// Cache line padded sequence counter.
+//
+// Can be used across threads without worrying about false sharing if a
+// located adjacent to another counter in memory.
+class PaddedSequence : public Sequence {
+ public:
+    PaddedSequence(int64_t initial_value = kInitialCursorValue) :
+            Sequence(initial_value) {}
+
+ private:
+    // padding
+    int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
+
+};
+
+// Non-atomic sequence counter.
+//
+// This counter is not thread safe.
+class MutableLong {
+ public:
+     MutableLong(int64_t initial_value = kInitialCursorValue) :
+         sequence_(initial_value) {}
+
+     int64_t sequence() const { return sequence_; }
+
+     void set_sequence(const int64_t& sequence) { sequence_ = sequence; };
+
+     int64_t IncrementAndGet(const int64_t& delta) { sequence_ += delta; return sequence_; }
+
+ private:
+     volatile int64_t sequence_;
+};
+
+// Cache line padded non-atomic sequence counter.
+//
+// This counter is not thread safe.
+class PaddedLong : public MutableLong {
+ public:
+     PaddedLong(int64_t initial_value = kInitialCursorValue) :
+         MutableLong(initial_value) {}
+ private:
+     int64_t padding_[SEQUENCE_PADDING_LENGTH];
+};
+
+int64_t GetMinimumSequence(
+        const std::vector<Sequence*>& sequences) {
+        int64_t minimum = std::numeric_limits<int64_t>::max();
+
+        std::vector<Sequence*>::const_iterator it= sequences.begin();
+        for (;it!=sequences.end();it++) {
+            int64_t sequence = (*it)->sequence();
+            minimum = minimum < sequence ? minimum : sequence;
+        }
+
+        return minimum;
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_SEQUENCE_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
new file mode 100755
index 0000000..c156388
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequence_barrier.h
@@ -0,0 +1,92 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT
+#define DISRUPTOR_SEQUENCE_BARRIER_H_ // NOLINT
+
+#include <memory>
+#include <vector>
+
+#include "exceptions.h"
+#include "interface.h"
+namespace rocketmq {
+
+class ProcessingSequenceBarrier : SequenceBarrierInterface {
+ public:
+    ProcessingSequenceBarrier(WaitStrategyInterface* wait_strategy,
+                              Sequence* sequence,
+                              const std::vector<Sequence*>& sequences) :
+        wait_strategy_(wait_strategy),
+        cursor_(sequence),
+        dependent_sequences_(sequences),
+        alerted_(false) {
+    }
+
+    virtual int64_t WaitFor(const int64_t& sequence) {
+        return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this,
+                                       sequence);
+    }
+
+    virtual int64_t WaitFor(const int64_t& sequence,
+                            const int64_t& timeout_micros) {
+        return wait_strategy_->WaitFor(dependent_sequences_, *cursor_, *this,
+                                       sequence, timeout_micros);
+    }
+
+    virtual int64_t GetCursor() const {
+        return cursor_->sequence();
+    }
+
+    virtual bool IsAlerted() const {
+        return alerted_.load(boost::memory_order_acquire);
+    }
+
+    virtual void Alert() {
+        //metaq::LOG_INFO("set alert to true");
+        alerted_.store(true, boost::memory_order_release);
+    }
+
+    virtual void ClearAlert() {
+        alerted_.store(false, boost::memory_order_release);
+    }
+
+    virtual void CheckAlert() const {
+        if (IsAlerted())
+        {
+            //metaq::LOG_INFO("throw alert exception\r\n");
+            throw AlertException();
+        }
+    }
+
+ private:
+    WaitStrategyInterface* wait_strategy_;
+    Sequence* cursor_;
+    std::vector<Sequence*> dependent_sequences_;
+    boost::atomic<bool> alerted_;
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_DEPENDENCY_BARRIER_H_ NOLINT