You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:54 UTC

[08/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/FilterAPI.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/FilterAPI.h b/rocketmq-cpp/src/common/FilterAPI.h
new file mode 100755
index 0000000..c95f17e
--- /dev/null
+++ b/rocketmq-cpp/src/common/FilterAPI.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FILTERAPI_H__
+#define __FILTERAPI_H__
+
+#include <string>
+#include "MQClientException.h"
+#include "SubscriptionData.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!***************************************************************************
+class FilterAPI {
+ public:
+  static SubscriptionData* buildSubscriptionData(const string topic,
+                                                 const string& subString) {
+    //<!delete in balance;
+    SubscriptionData* subscriptionData = new SubscriptionData(topic, subString);
+
+    if (subString.empty() || !subString.compare(SUB_ALL)) {
+      subscriptionData->setSubString(SUB_ALL);
+    } else {
+      vector<string> out;
+      UtilAll::Split(out, subString, "||");
+
+      if (out.empty()) {
+        THROW_MQEXCEPTION(MQClientException, "FilterAPI subString split error",
+                          -1);
+      }
+
+      for (size_t i = 0; i < out.size(); i++) {
+        string tag = out[i];
+        if (!tag.empty()) {
+          UtilAll::Trim(tag);
+          if (!tag.empty()) {
+            subscriptionData->putTagsSet(tag);
+            subscriptionData->putCodeSet(tag);
+          }
+        }
+      }
+    }
+
+    return subscriptionData;
+  }
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/InputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/InputStream.cpp b/rocketmq-cpp/src/common/InputStream.cpp
new file mode 100644
index 0000000..4eb5587
--- /dev/null
+++ b/rocketmq-cpp/src/common/InputStream.cpp
@@ -0,0 +1,149 @@
+#include "InputStream.h"
+#include <algorithm>
+#include "MemoryOutputStream.h"
+
+namespace rocketmq {
+int64 InputStream::getNumBytesRemaining() {
+  int64 len = getTotalLength();
+
+  if (len >= 0) len -= getPosition();
+
+  return len;
+}
+
+char InputStream::readByte() {
+  char temp = 0;
+  read(&temp, 1);
+  return temp;
+}
+
+bool InputStream::readBool() { return readByte() != 0; }
+
+short InputStream::readShort() {
+  char temp[2];
+
+  if (read(temp, 2) == 2) return (short)ByteOrder::littleEndianShort(temp);
+
+  return 0;
+}
+
+short InputStream::readShortBigEndian() {
+  char temp[2];
+
+  if (read(temp, 2) == 2) return (short)ByteOrder::bigEndianShort(temp);
+
+  return 0;
+}
+
+int InputStream::readInt() {
+  char temp[4];
+
+  if (read(temp, 4) == 4) return (int)ByteOrder::littleEndianInt(temp);
+
+  return 0;
+}
+
+int InputStream::readIntBigEndian() {
+  char temp[4];
+
+  if (read(temp, 4) == 4) return (int)ByteOrder::bigEndianInt(temp);
+
+  return 0;
+}
+
+int InputStream::readCompressedInt() {
+  const uint8 sizeByte = (uint8)readByte();
+  if (sizeByte == 0) return 0;
+
+  const int numBytes = (sizeByte & 0x7f);
+  if (numBytes > 4) {
+    return 0;
+  }
+
+  char bytes[4] = {0, 0, 0, 0};
+  if (read(bytes, numBytes) != numBytes) return 0;
+
+  const int num = (int)ByteOrder::littleEndianInt(bytes);
+  return (sizeByte >> 7) ? -num : num;
+}
+
+int64 InputStream::readInt64() {
+  union {
+    uint8 asBytes[8];
+    uint64 asInt64;
+  } n;
+
+  if (read(n.asBytes, 8) == 8)
+    return (int64)ByteOrder::swapIfBigEndian(n.asInt64);
+
+  return 0;
+}
+
+int64 InputStream::readInt64BigEndian() {
+  union {
+    uint8 asBytes[8];
+    uint64 asInt64;
+  } n;
+
+  if (read(n.asBytes, 8) == 8)
+    return (int64)ByteOrder::swapIfLittleEndian(n.asInt64);
+
+  return 0;
+}
+
+float InputStream::readFloat() {
+  // the union below relies on these types being the same size...
+  union {
+    int32 asInt;
+    float asFloat;
+  } n;
+  n.asInt = (int32)readInt();
+  return n.asFloat;
+}
+
+float InputStream::readFloatBigEndian() {
+  union {
+    int32 asInt;
+    float asFloat;
+  } n;
+  n.asInt = (int32)readIntBigEndian();
+  return n.asFloat;
+}
+
+double InputStream::readDouble() {
+  union {
+    int64 asInt;
+    double asDouble;
+  } n;
+  n.asInt = readInt64();
+  return n.asDouble;
+}
+
+double InputStream::readDoubleBigEndian() {
+  union {
+    int64 asInt;
+    double asDouble;
+  } n;
+  n.asInt = readInt64BigEndian();
+  return n.asDouble;
+}
+
+size_t InputStream::readIntoMemoryBlock(MemoryBlock& block, ssize_t numBytes) {
+  MemoryOutputStream mo(block, true);
+  return (size_t)mo.writeFromInputStream(*this, numBytes);
+}
+
+//==============================================================================
+void InputStream::skipNextBytes(int64 numBytesToSkip) {
+  if (numBytesToSkip > 0) {
+    const int skipBufferSize = (int)std::min(numBytesToSkip, (int64)16384);
+    char* temp = static_cast<char*>(std::malloc(skipBufferSize * sizeof(char)));
+
+    while (numBytesToSkip > 0 && !isExhausted())
+      numBytesToSkip -=
+          read(temp, (int)std::min(numBytesToSkip, (int64)skipBufferSize));
+
+    std::free(temp);
+  }
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/InputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/InputStream.h b/rocketmq-cpp/src/common/InputStream.h
new file mode 100644
index 0000000..c987be7
--- /dev/null
+++ b/rocketmq-cpp/src/common/InputStream.h
@@ -0,0 +1,241 @@
+#ifndef INPUTSTREAM_H_INCLUDED
+#define INPUTSTREAM_H_INCLUDED
+
+#include "ByteOrder.h"
+#include "dataBlock.h"
+//==============================================================================
+/** The base class for streams that read data.
+
+    Input and output streams are used throughout the library - subclasses can
+   override
+    some or all of the virtual functions to implement their behaviour.
+
+    @see OutputStream, MemoryInputStream, BufferedInputStream, FileInputStream
+*/
+namespace rocketmq {
+class ROCKETMQCLIENT_API InputStream {
+ public:
+  /** Destructor. */
+  virtual ~InputStream() {}
+
+  //==============================================================================
+  /** Returns the total number of bytes available for reading in this stream.
+
+      Note that this is the number of bytes available from the start of the
+      stream, not from the current position.
+
+      If the size of the stream isn't actually known, this will return -1.
+
+      @see getNumBytesRemaining
+  */
+  virtual int64 getTotalLength() = 0;
+
+  /** Returns the number of bytes available for reading, or a negative value if
+      the remaining length is not known.
+      @see getTotalLength
+  */
+  int64 getNumBytesRemaining();
+
+  /** Returns true if the stream has no more data to read. */
+  virtual bool isExhausted() = 0;
+
+  //==============================================================================
+  /** Reads some data from the stream into a memory buffer.
+
+      This is the only read method that subclasses actually need to implement,
+     as the
+      InputStream base class implements the other read methods in terms of this
+     one (although
+      it's often more efficient for subclasses to implement them directly).
+
+      @param destBuffer       the destination buffer for the data. This must not
+     be null.
+      @param maxBytesToRead   the maximum number of bytes to read - make sure
+     the
+                              memory block passed in is big enough to contain
+     this
+                              many bytes. This value must not be negative.
+
+      @returns    the actual number of bytes that were read, which may be less
+     than
+                  maxBytesToRead if the stream is exhausted before it gets that
+     far
+  */
+  virtual int read(void* destBuffer, int maxBytesToRead) = 0;
+
+  /** Reads a byte from the stream.
+      If the stream is exhausted, this will return zero.
+      @see OutputStream::writeByte
+  */
+  virtual char readByte();
+
+  /** Reads a boolean from the stream.
+      The bool is encoded as a single byte - non-zero for true, 0 for false.
+      If the stream is exhausted, this will return false.
+      @see OutputStream::writeBool
+  */
+  virtual bool readBool();
+
+  /** Reads two bytes from the stream as a little-endian 16-bit value.
+      If the next two bytes read are byte1 and byte2, this returns (byte1 |
+     (byte2 << 8)).
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeShort, readShortBigEndian
+  */
+  virtual short readShort();
+
+  /** Reads two bytes from the stream as a little-endian 16-bit value.
+      If the next two bytes read are byte1 and byte2, this returns (byte2 |
+     (byte1 << 8)).
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeShortBigEndian, readShort
+  */
+  virtual short readShortBigEndian();
+
+  /** Reads four bytes from the stream as a little-endian 32-bit value.
+
+      If the next four bytes are byte1 to byte4, this returns
+      (byte1 | (byte2 << 8) | (byte3 << 16) | (byte4 << 24)).
+
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+
+      @see OutputStream::writeInt, readIntBigEndian
+  */
+  virtual int readInt();
+
+  /** Reads four bytes from the stream as a big-endian 32-bit value.
+
+      If the next four bytes are byte1 to byte4, this returns
+      (byte4 | (byte3 << 8) | (byte2 << 16) | (byte1 << 24)).
+
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+
+      @see OutputStream::writeIntBigEndian, readInt
+  */
+  virtual int readIntBigEndian();
+
+  /** Reads eight bytes from the stream as a little-endian 64-bit value.
+
+      If the next eight bytes are byte1 to byte8, this returns
+      (byte1 | (byte2 << 8) | (byte3 << 16) | (byte4 << 24) | (byte5 << 32) |
+     (byte6 << 40) | (byte7 << 48) | (byte8 << 56)).
+
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+
+      @see OutputStream::writeInt64, readInt64BigEndian
+  */
+  virtual int64 readInt64();
+
+  /** Reads eight bytes from the stream as a big-endian 64-bit value.
+
+      If the next eight bytes are byte1 to byte8, this returns
+      (byte8 | (byte7 << 8) | (byte6 << 16) | (byte5 << 24) | (byte4 << 32) |
+     (byte3 << 40) | (byte2 << 48) | (byte1 << 56)).
+
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+
+      @see OutputStream::writeInt64BigEndian, readInt64
+  */
+  virtual int64 readInt64BigEndian();
+
+  /** Reads four bytes as a 32-bit floating point value.
+      The raw 32-bit encoding of the float is read from the stream as a
+     little-endian int.
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeFloat, readDouble
+  */
+  virtual float readFloat();
+
+  /** Reads four bytes as a 32-bit floating point value.
+      The raw 32-bit encoding of the float is read from the stream as a
+     big-endian int.
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeFloatBigEndian, readDoubleBigEndian
+  */
+  virtual float readFloatBigEndian();
+
+  /** Reads eight bytes as a 64-bit floating point value.
+      The raw 64-bit encoding of the double is read from the stream as a
+     little-endian int64.
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeDouble, readFloat
+  */
+  virtual double readDouble();
+
+  /** Reads eight bytes as a 64-bit floating point value.
+      The raw 64-bit encoding of the double is read from the stream as a
+     big-endian int64.
+      If the stream is exhausted partway through reading the bytes, this will
+     return zero.
+      @see OutputStream::writeDoubleBigEndian, readFloatBigEndian
+  */
+  virtual double readDoubleBigEndian();
+
+  /** Reads an encoded 32-bit number from the stream using a space-saving
+     compressed format.
+      For small values, this is more space-efficient than using readInt() and
+     OutputStream::writeInt()
+      The format used is: number of significant bytes + up to 4 bytes in
+     little-endian order.
+      @see OutputStream::writeCompressedInt()
+  */
+  virtual int readCompressedInt();
+
+  //==============================================================================whole
+  // stream and turn it into a string.
+  /** Reads from the stream and appends the data to a MemoryBlock.
+
+      @param destBlock            the block to append the data onto
+      @param maxNumBytesToRead    if this is a positive value, it sets a limit
+     to the number
+                                  of bytes that will be read - if it's negative,
+     data
+                                  will be read until the stream is exhausted.
+      @returns the number of bytes that were added to the memory block
+  */
+  virtual size_t readIntoMemoryBlock(MemoryBlock& destBlock,
+                                     ssize_t maxNumBytesToRead = -1);
+
+  //==============================================================================
+  /** Returns the offset of the next byte that will be read from the stream.
+      @see setPosition
+  */
+  virtual int64 getPosition() = 0;
+
+  /** Tries to move the current read position of the stream.
+
+      The position is an absolute number of bytes from the stream's start.
+
+      Some streams might not be able to do this, in which case they should do
+      nothing and return false. Others might be able to manage it by resetting
+      themselves and skipping to the correct position, although this is
+      obviously a bit slow.
+
+      @returns  true if the stream manages to reposition itself correctly
+      @see getPosition
+  */
+  virtual bool setPosition(int64 newPosition) = 0;
+
+  /** Reads and discards a number of bytes from the stream.
+
+      Some input streams might implement this efficiently, but the base
+      class will just keep reading data until the requisite number of bytes
+      have been done.
+  */
+  virtual void skipNextBytes(int64 numBytesToSkip);
+
+ protected:
+  //==============================================================================
+  InputStream() {}
+};
+}
+#endif  // INPUTSTREAM_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQClient.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQClient.cpp b/rocketmq-cpp/src/common/MQClient.cpp
new file mode 100755
index 0000000..25d84de
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQClient.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MQClient.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "TopicPublishInfo.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+#define METAQCPP_VERSION "1.0.0"
+#define BUILD_DATE "08-08-2017"
+// display version: strings bin/librocketmq.so |grep VERSION
+const char *metaq_build_time =
+    "VERSION: " METAQCPP_VERSION ", BUILD DATE: " BUILD_DATE " ";
+
+//<!************************************************************************
+MQClient::MQClient() {
+  string NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
+  if (const char *addr = getenv(NAMESRV_ADDR_ENV.c_str()))
+    m_namesrvAddr = addr;
+  else
+    m_namesrvAddr = "";
+
+  m_instanceName = "DEFAULT";
+  m_clientFactory = NULL;
+  m_serviceState = CREATE_JUST;
+  m_pullThreadNum = boost::thread::hardware_concurrency();
+  m_tcpConnectTimeout = 3000;        // 3s
+  m_tcpTransportTryLockTimeout = 3;  // 3s
+  m_unitName = "";
+}
+
+MQClient::~MQClient() {}
+
+string MQClient::getMQClientId() const {
+  string clientIP = UtilAll::getLocalAddress();
+  string processId = UtilAll::to_string(getpid());
+  return processId + "-" + clientIP + "@" + m_instanceName;
+}
+
+//<!groupName;
+const string &MQClient::getGroupName() const { return m_GroupName; }
+
+void MQClient::setGroupName(const string &groupname) {
+  m_GroupName = groupname;
+}
+
+const string &MQClient::getNamesrvAddr() const { return m_namesrvAddr; }
+
+void MQClient::setNamesrvAddr(const string &namesrvAddr) {
+  m_namesrvAddr = namesrvAddr;
+}
+
+const string &MQClient::getNamesrvDomain() const { return m_namesrvDomain; }
+
+void MQClient::setNamesrvDomain(const string &namesrvDomain) {
+  m_namesrvDomain = namesrvDomain;
+}
+
+const string &MQClient::getInstanceName() const { return m_instanceName; }
+
+void MQClient::setInstanceName(const string &instanceName) {
+  m_instanceName = instanceName;
+}
+
+void MQClient::createTopic(const string &key, const string &newTopic,
+                           int queueNum) {
+  try {
+    getFactory()->createTopic(key, newTopic, queueNum, m_SessionCredentials);
+  } catch (MQException &e) {
+    LOG_ERROR(e.what());
+  }
+}
+
+int64 MQClient::earliestMsgStoreTime(const MQMessageQueue &mq) {
+  return getFactory()->earliestMsgStoreTime(mq, m_SessionCredentials);
+}
+
+QueryResult MQClient::queryMessage(const string &topic, const string &key,
+                                   int maxNum, int64 begin, int64 end) {
+  return getFactory()->queryMessage(topic, key, maxNum, begin, end,
+                                    m_SessionCredentials);
+}
+
+int64 MQClient::minOffset(const MQMessageQueue &mq) {
+  return getFactory()->minOffset(mq, m_SessionCredentials);
+}
+
+int64 MQClient::maxOffset(const MQMessageQueue &mq) {
+  return getFactory()->maxOffset(mq, m_SessionCredentials);
+}
+
+int64 MQClient::searchOffset(const MQMessageQueue &mq, uint64_t timestamp) {
+  return getFactory()->searchOffset(mq, timestamp, m_SessionCredentials);
+}
+
+MQMessageExt *MQClient::viewMessage(const string &msgId) {
+  return getFactory()->viewMessage(msgId, m_SessionCredentials);
+}
+
+vector<MQMessageQueue> MQClient::getTopicMessageQueueInfo(const string &topic) {
+  boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+      getFactory()->tryToFindTopicPublishInfo(topic, m_SessionCredentials));
+  boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+      weak_topicPublishInfo.lock());
+  if (topicPublishInfo) {
+    return topicPublishInfo->getMessageQueueList();
+  }
+  THROW_MQEXCEPTION(
+      MQClientException,
+      "could not find MessageQueue Info of topic: [" + topic + "].", -1);
+}
+
+void MQClient::start() {
+  if (getFactory() == NULL) {
+    m_clientFactory = MQClientManager::getInstance()->getMQClientFactory(
+        getMQClientId(), m_pullThreadNum, m_tcpConnectTimeout,
+        m_tcpTransportTryLockTimeout, m_unitName);
+  }
+  LOG_INFO(
+      "MQClient "
+      "start,groupname:%s,clientID:%s,instanceName:%s,nameserveraddr:%s",
+      getGroupName().c_str(), getMQClientId().c_str(),
+      getInstanceName().c_str(), getNamesrvAddr().c_str());
+}
+
+void MQClient::shutdown() { m_clientFactory = NULL; }
+
+MQClientFactory *MQClient::getFactory() const { return m_clientFactory; }
+
+bool MQClient::isServiceStateOk() { return m_serviceState == RUNNING; }
+
+void MQClient::setMetaqLogLevel(elogLevel inputLevel) {
+  ALOG_ADAPTER.setLogLevel(inputLevel);
+}
+
+void MQClient::setMetaqLogFileSizeAndNum(int fileNum, long perFileSize) {
+  ALOG_ADAPTER.setLogFileNumAndSize(fileNum, perFileSize);
+}
+
+void MQClient::setTcpTransportPullThreadNum(int num) {
+  if (num > m_pullThreadNum) {
+    m_pullThreadNum = num;
+  }
+}
+
+const int MQClient::getTcpTransportPullThreadNum() const {
+  return m_pullThreadNum;
+}
+
+void MQClient::setTcpTransportConnectTimeout(uint64_t timeout) {
+  m_tcpConnectTimeout = timeout;
+}
+const uint64_t MQClient::getTcpTransportConnectTimeout() const {
+  return m_tcpConnectTimeout;
+}
+
+void MQClient::setTcpTransportTryLockTimeout(uint64_t timeout) {
+  if (timeout < 1000) {
+    timeout = 1000;
+  }
+  m_tcpTransportTryLockTimeout = timeout / 1000;
+}
+const uint64_t MQClient::getTcpTransportTryLockTimeout() const {
+  return m_tcpTransportTryLockTimeout;
+}
+
+void MQClient::setUnitName(string unitName) { m_unitName = unitName; }
+const string &MQClient::getUnitName() { return m_unitName; }
+
+void MQClient::setSessionCredentials(const string &input_accessKey,
+                                     const string &input_secretKey,
+                                     const string &input_onsChannel) {
+  m_SessionCredentials.setAccessKey(input_accessKey);
+  m_SessionCredentials.setSecretKey(input_secretKey);
+  m_SessionCredentials.setAuthChannel(input_onsChannel);
+}
+
+const SessionCredentials &MQClient::getSessionCredentials() const {
+  return m_SessionCredentials;
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQVersion.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQVersion.cpp b/rocketmq-cpp/src/common/MQVersion.cpp
new file mode 100755
index 0000000..015390c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQVersion.cpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQVersion.h"
+
+namespace rocketmq {
+int MQVersion::s_CurrentVersion = MQVersion::V3_1_8;
+
+//<!************************************************************************
+const char* MQVersion::getVersionDesc(int value) {
+  switch (value) {
+    // case V1_0_0:
+    // return "V1_0_0";
+  }
+  return "";
+}
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQVersion.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQVersion.h b/rocketmq-cpp/src/common/MQVersion.h
new file mode 100755
index 0000000..aa9e9cd
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQVersion.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQVERSION_H__
+#define __MQVERSION_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQVersion {
+ public:
+  enum Version {
+    V3_0_0_SNAPSHOT,
+    V3_0_0_ALPHA1,
+    V3_0_0_BETA1,
+    V3_0_0_BETA2,
+    V3_0_0_BETA3,
+    V3_0_0_BETA4,
+    V3_0_0_BETA5,
+    V3_0_0_BETA6_SNAPSHOT,
+    V3_0_0_BETA6,
+    V3_0_0_BETA7_SNAPSHOT,
+    V3_0_0_BETA7,
+    V3_0_0_BETA8_SNAPSHOT,
+    V3_0_0_BETA8,
+    V3_0_0_BETA9_SNAPSHOT,
+    V3_0_0_BETA9,
+    V3_0_0_FINAL,
+    V3_0_1_SNAPSHOT,
+    V3_0_1,
+    V3_0_2_SNAPSHOT,
+    V3_0_2,
+    V3_0_3_SNAPSHOT,
+    V3_0_3,
+    V3_0_4_SNAPSHOT,
+    V3_0_4,
+    V3_0_5_SNAPSHOT,
+    V3_0_5,
+    V3_0_6_SNAPSHOT,
+    V3_0_6,
+    V3_0_7_SNAPSHOT,
+    V3_0_7,
+    V3_0_8_SNAPSHOT,
+    V3_0_8,
+    V3_0_9_SNAPSHOT,
+    V3_0_9,
+
+    V3_0_10_SNAPSHOT,
+    V3_0_10,
+
+    V3_0_11_SNAPSHOT,
+    V3_0_11,
+
+    V3_0_12_SNAPSHOT,
+    V3_0_12,
+
+    V3_0_13_SNAPSHOT,
+    V3_0_13,
+
+    V3_0_14_SNAPSHOT,
+    V3_0_14,
+
+    V3_0_15_SNAPSHOT,
+    V3_0_15,
+
+    V3_1_0_SNAPSHOT,
+    V3_1_0,
+
+    V3_1_1_SNAPSHOT,
+    V3_1_1,
+
+    V3_1_2_SNAPSHOT,
+    V3_1_2,
+
+    V3_1_3_SNAPSHOT,
+    V3_1_3,
+
+    V3_1_4_SNAPSHOT,
+    V3_1_4,
+
+    V3_1_5_SNAPSHOT,
+    V3_1_5,
+
+    V3_1_6_SNAPSHOT,
+    V3_1_6,
+
+    V3_1_7_SNAPSHOT,
+    V3_1_7,
+
+    V3_1_8_SNAPSHOT,
+    V3_1_8,
+
+    V3_1_9_SNAPSHOT,
+    V3_1_9,
+
+    V3_2_0_SNAPSHOT,
+    V3_2_0,
+
+    V3_2_1_SNAPSHOT,
+    V3_2_1,
+
+    V3_2_2_SNAPSHOT,
+    V3_2_2,
+
+    V3_2_3_SNAPSHOT,
+    V3_2_3,
+
+    V3_2_4_SNAPSHOT,
+    V3_2_4,
+
+    V3_2_5_SNAPSHOT,
+    V3_2_5,
+
+    V3_2_6_SNAPSHOT,
+    V3_2_6,
+
+    V3_2_7_SNAPSHOT,
+    V3_2_7,
+
+    V3_2_8_SNAPSHOT,
+    V3_2_8,
+
+    V3_2_9_SNAPSHOT,
+    V3_2_9,
+
+    V3_3_1_SNAPSHOT,
+    V3_3_1,
+
+    V3_3_2_SNAPSHOT,
+    V3_3_2,
+
+    V3_3_3_SNAPSHOT,
+    V3_3_3,
+
+    V3_3_4_SNAPSHOT,
+    V3_3_4,
+
+    V3_3_5_SNAPSHOT,
+    V3_3_5,
+
+    V3_3_6_SNAPSHOT,
+    V3_3_6,
+
+    V3_3_7_SNAPSHOT,
+    V3_3_7,
+
+    V3_3_8_SNAPSHOT,
+    V3_3_8,
+
+    V3_3_9_SNAPSHOT,
+    V3_3_9,
+
+    V3_4_1_SNAPSHOT,
+    V3_4_1,
+
+    V3_4_2_SNAPSHOT,
+    V3_4_2,
+
+    V3_4_3_SNAPSHOT,
+    V3_4_3,
+
+    V3_4_4_SNAPSHOT,
+    V3_4_4,
+
+    V3_4_5_SNAPSHOT,
+    V3_4_5,
+
+    V3_4_6_SNAPSHOT,
+    V3_4_6,
+
+    V3_4_7_SNAPSHOT,
+    V3_4_7,
+
+    V3_4_8_SNAPSHOT,
+    V3_4_8,
+
+    V3_4_9_SNAPSHOT,
+    V3_4_9,
+    V3_5_1_SNAPSHOT,
+    V3_5_1,
+
+    V3_5_2_SNAPSHOT,
+    V3_5_2,
+
+    V3_5_3_SNAPSHOT,
+    V3_5_3,
+
+    V3_5_4_SNAPSHOT,
+    V3_5_4,
+
+    V3_5_5_SNAPSHOT,
+    V3_5_5,
+
+    V3_5_6_SNAPSHOT,
+    V3_5_6,
+
+    V3_5_7_SNAPSHOT,
+    V3_5_7,
+
+    V3_5_8_SNAPSHOT,
+    V3_5_8,
+
+    V3_5_9_SNAPSHOT,
+    V3_5_9,
+  };
+
+  static const char* getVersionDesc(int value);
+
+ public:
+  static int s_CurrentVersion;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryInputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryInputStream.cpp b/rocketmq-cpp/src/common/MemoryInputStream.cpp
new file mode 100644
index 0000000..bfbd772
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryInputStream.cpp
@@ -0,0 +1,55 @@
+#include "MemoryInputStream.h"
+
+namespace rocketmq {
+MemoryInputStream::MemoryInputStream(const void* const sourceData,
+                                     const size_t sourceDataSize,
+                                     const bool keepInternalCopy)
+    : data(sourceData),
+      dataSize(sourceDataSize),
+      position(0),
+      internalCopy(NULL) {
+  if (keepInternalCopy) createInternalCopy();
+}
+
+MemoryInputStream::MemoryInputStream(const MemoryBlock& sourceData,
+                                     const bool keepInternalCopy)
+    : data(sourceData.getData()),
+      dataSize(sourceData.getSize()),
+      position(0),
+      internalCopy(NULL) {
+  if (keepInternalCopy) createInternalCopy();
+}
+
+void MemoryInputStream::createInternalCopy() {
+  std::free(internalCopy);
+  internalCopy = static_cast<char*>(std::malloc(dataSize));
+  memcpy(internalCopy, data, dataSize);
+  data = internalCopy;
+}
+
+MemoryInputStream::~MemoryInputStream() { std::free(internalCopy); }
+
+int64 MemoryInputStream::getTotalLength() { return (int64)dataSize; }
+
+int MemoryInputStream::read(void* const buffer, const int howMany) {
+  const int num = std::min(howMany, (int)(dataSize - position));
+  if (num <= 0) return 0;
+
+  memcpy(buffer, data + position, (size_t)num);
+  position += (unsigned int)num;
+  return num;
+}
+
+bool MemoryInputStream::isExhausted() { return position >= dataSize; }
+
+bool MemoryInputStream::setPosition(const int64 pos) {
+  if (pos < 0)
+    position = 0;
+  else
+    position = (int64)dataSize < pos ? (int64)dataSize : pos;
+
+  return true;
+}
+
+int64 MemoryInputStream::getPosition() { return (int64)position; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryInputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryInputStream.h b/rocketmq-cpp/src/common/MemoryInputStream.h
new file mode 100644
index 0000000..a25f079
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryInputStream.h
@@ -0,0 +1,80 @@
+#ifndef MEMORYINPUTSTREAM_H_INCLUDED
+#define MEMORYINPUTSTREAM_H_INCLUDED
+
+#include "InputStream.h"
+
+namespace rocketmq {
+//==============================================================================
+/**
+    Allows a block of data to be accessed as a stream.
+
+    This can either be used to refer to a shared block of memory, or can make
+   its
+    own internal copy of the data when the MemoryInputStream is created.
+*/
+class ROCKETMQCLIENT_API MemoryInputStream : public InputStream {
+ public:
+  //==============================================================================
+  /** Creates a MemoryInputStream.
+
+      @param sourceData               the block of data to use as the stream's
+     source
+      @param sourceDataSize           the number of bytes in the source data
+     block
+      @param keepInternalCopyOfData   if false, the stream will just keep a
+     pointer to
+                                      the source data, so this data shouldn't be
+     changed
+                                      for the lifetime of the stream; if this
+     parameter is
+                                      true, the stream will make its own copy of
+     the
+                                      data and use that.
+  */
+  MemoryInputStream(const void* sourceData, size_t sourceDataSize,
+                    bool keepInternalCopyOfData);
+
+  /** Creates a MemoryInputStream.
+
+      @param data                     a block of data to use as the stream's
+     source
+      @param keepInternalCopyOfData   if false, the stream will just keep a
+     reference to
+                                      the source data, so this data shouldn't be
+     changed
+                                      for the lifetime of the stream; if this
+     parameter is
+                                      true, the stream will make its own copy of
+     the
+                                      data and use that.
+  */
+  MemoryInputStream(const MemoryBlock& data, bool keepInternalCopyOfData);
+
+  /** Destructor. */
+  ~MemoryInputStream();
+
+  /** Returns a pointer to the source data block from which this stream is
+   * reading. */
+  const void* getData() const { return data; }
+
+  /** Returns the number of bytes of source data in the block from which this
+   * stream is reading. */
+  size_t getDataSize() const { return dataSize; }
+
+  //==============================================================================
+  int64 getPosition();
+  bool setPosition(int64 pos);
+  int64 getTotalLength();
+  bool isExhausted();
+  int read(void* destBuffer, int maxBytesToRead);
+
+ private:
+  //==============================================================================
+  const void* data;
+  size_t dataSize, position;
+  char* internalCopy;
+
+  void createInternalCopy();
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryOutputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryOutputStream.cpp b/rocketmq-cpp/src/common/MemoryOutputStream.cpp
new file mode 100644
index 0000000..36d9f8c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryOutputStream.cpp
@@ -0,0 +1,148 @@
+#include "MemoryOutputStream.h"
+
+namespace rocketmq {
+MemoryOutputStream::MemoryOutputStream(const size_t initialSize)
+    : blockToUse(&internalBlock),
+      externalData(NULL),
+      position(0),
+      size(0),
+      availableSize(0) {
+  internalBlock.setSize(initialSize, false);
+}
+
+MemoryOutputStream::MemoryOutputStream(MemoryBlock& memoryBlockToWriteTo,
+                                       const bool appendToExistingBlockContent)
+    : blockToUse(&memoryBlockToWriteTo),
+      externalData(NULL),
+      position(0),
+      size(0),
+      availableSize(0) {
+  if (appendToExistingBlockContent)
+    position = size = memoryBlockToWriteTo.getSize();
+}
+
+MemoryOutputStream::MemoryOutputStream(void* destBuffer, size_t destBufferSize)
+    : blockToUse(NULL),
+      externalData(destBuffer),
+      position(0),
+      size(0),
+      availableSize(destBufferSize) {}
+
+MemoryOutputStream::~MemoryOutputStream() { trimExternalBlockSize(); }
+
+void MemoryOutputStream::flush() { trimExternalBlockSize(); }
+
+void MemoryOutputStream::trimExternalBlockSize() {
+  if (blockToUse != &internalBlock && blockToUse != NULL)
+    blockToUse->setSize(size, false);
+}
+
+void MemoryOutputStream::preallocate(const size_t bytesToPreallocate) {
+  if (blockToUse != NULL) blockToUse->ensureSize(bytesToPreallocate + 1);
+}
+
+void MemoryOutputStream::reset() {
+  position = 0;
+  size = 0;
+}
+
+char* MemoryOutputStream::prepareToWrite(size_t numBytes) {
+  size_t storageNeeded = position + numBytes;
+
+  char* data;
+
+  if (blockToUse != NULL) {
+    if (storageNeeded >= (unsigned int)(blockToUse->getSize()))
+      blockToUse->ensureSize(
+          (storageNeeded + std::min(storageNeeded / 2, (size_t)(1024 * 1024)) +
+           32) &
+          ~31u);
+
+    data = static_cast<char*>(blockToUse->getData());
+  } else {
+    if (storageNeeded > availableSize) return NULL;
+
+    data = static_cast<char*>(externalData);
+  }
+
+  char* const writePointer = data + position;
+  position += numBytes;
+  size = std::max(size, position);
+  return writePointer;
+}
+
+bool MemoryOutputStream::write(const void* const buffer, size_t howMany) {
+  if (howMany == 0) return true;
+
+  if (char* dest = prepareToWrite(howMany)) {
+    memcpy(dest, buffer, howMany);
+    return true;
+  }
+
+  return false;
+}
+
+bool MemoryOutputStream::writeRepeatedByte(uint8 byte, size_t howMany) {
+  if (howMany == 0) return true;
+
+  if (char* dest = prepareToWrite(howMany)) {
+    memset(dest, byte, howMany);
+    return true;
+  }
+
+  return false;
+}
+
+MemoryBlock MemoryOutputStream::getMemoryBlock() const {
+  return MemoryBlock(getData(), getDataSize());
+}
+
+const void* MemoryOutputStream::getData() const {
+  if (blockToUse == NULL) return externalData;
+
+  if ((unsigned int)blockToUse->getSize() > size)
+    static_cast<char*>(blockToUse->getData())[size] = 0;
+
+  return blockToUse->getData();
+}
+
+bool MemoryOutputStream::setPosition(int64 newPosition) {
+  if (newPosition <= (int64)size) {
+    // ok to seek backwards
+    if (newPosition < 0)
+      position = 0;
+    else
+      position = (int64)size < newPosition ? size : newPosition;
+    return true;
+  }
+
+  // can't move beyond the end of the stream..
+  return false;
+}
+
+int64 MemoryOutputStream::writeFromInputStream(InputStream& source,
+                                               int64 maxNumBytesToWrite) {
+  // before writing from an input, see if we can preallocate to make it more
+  // efficient..
+  int64 availableData = source.getTotalLength() - source.getPosition();
+
+  if (availableData > 0) {
+    if (maxNumBytesToWrite > availableData || maxNumBytesToWrite < 0)
+      maxNumBytesToWrite = availableData;
+
+    if (blockToUse != NULL)
+      preallocate(blockToUse->getSize() + (size_t)maxNumBytesToWrite);
+  }
+
+  return OutputStream::writeFromInputStream(source, maxNumBytesToWrite);
+}
+
+OutputStream& operator<<(OutputStream& stream,
+                         const MemoryOutputStream& streamToRead) {
+  const size_t dataSize = streamToRead.getDataSize();
+
+  if (dataSize > 0) stream.write(streamToRead.getData(), dataSize);
+
+  return stream;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryOutputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryOutputStream.h b/rocketmq-cpp/src/common/MemoryOutputStream.h
new file mode 100644
index 0000000..53fe66c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryOutputStream.h
@@ -0,0 +1,115 @@
+#ifndef MEMORYOUTPUTSTREAM_H_INCLUDED
+#define MEMORYOUTPUTSTREAM_H_INCLUDED
+
+#include "OutputStream.h"
+
+namespace rocketmq {
+//==============================================================================
+/**
+    Writes data to an internal memory buffer, which grows as required.
+
+    The data that was written into the stream can then be accessed later as
+    a contiguous block of memory.
+*/
+class ROCKETMQCLIENT_API MemoryOutputStream : public OutputStream {
+ public:
+  //==============================================================================
+  /** Creates an empty memory stream, ready to be written into.
+      @param initialSize  the intial amount of capacity to allocate for writing
+     into
+  */
+  MemoryOutputStream(size_t initialSize = 256);
+
+  /** Creates a memory stream for writing into into a pre-existing MemoryBlock
+     object.
+
+      Note that the destination block will always be larger than the amount of
+     data
+      that has been written to the stream, because the MemoryOutputStream keeps
+     some
+      spare capactity at its end. To trim the block's size down to fit the
+     actual
+      data, call flush(), or delete the MemoryOutputStream.
+
+      @param memoryBlockToWriteTo             the block into which new data will
+     be written.
+      @param appendToExistingBlockContent     if this is true, the contents of
+     the block will be
+                                              kept, and new data will be
+     appended to it. If false,
+                                              the block will be cleared before
+     use
+  */
+  MemoryOutputStream(MemoryBlock& memoryBlockToWriteTo,
+                     bool appendToExistingBlockContent);
+
+  /** Creates a MemoryOutputStream that will write into a user-supplied,
+     fixed-size
+      block of memory.
+      When using this mode, the stream will write directly into this memory area
+     until
+      it's full, at which point write operations will fail.
+  */
+  MemoryOutputStream(void* destBuffer, size_t destBufferSize);
+
+  /** Destructor.
+      This will free any data that was written to it.
+  */
+  ~MemoryOutputStream();
+
+  //==============================================================================
+  /** Returns a pointer to the data that has been written to the stream.
+      @see getDataSize
+  */
+  const void* getData() const;
+
+  /** Returns the number of bytes of data that have been written to the stream.
+      @see getData
+  */
+  size_t getDataSize() const { return size; }
+
+  /** Resets the stream, clearing any data that has been written to it so far.
+   */
+  void reset();
+
+  /** Increases the internal storage capacity to be able to contain at least the
+     specified
+      amount of data without needing to be resized.
+  */
+  void preallocate(size_t bytesToPreallocate);
+
+  /** Returns a copy of the stream's data as a memory block. */
+  MemoryBlock getMemoryBlock() const;
+
+  //==============================================================================
+  /** If the stream is writing to a user-supplied MemoryBlock, this will trim
+     any excess
+      capacity off the block, so that its length matches the amount of actual
+     data that
+      has been written so far.
+  */
+  void flush();
+
+  bool write(const void*, size_t);
+  int64 getPosition() { return (int64)position; }
+  bool setPosition(int64);
+  int64 writeFromInputStream(InputStream&, int64 maxNumBytesToWrite);
+  bool writeRepeatedByte(uint8 byte, size_t numTimesToRepeat);
+
+ private:
+  //==============================================================================
+  MemoryBlock* const blockToUse;
+  MemoryBlock internalBlock;
+  void* externalData;
+  size_t position, size, availableSize;
+
+  void trimExternalBlockSize();
+  char* prepareToWrite(size_t);
+};
+
+/** Copies all the data that has been written to a MemoryOutputStream into
+ * another stream. */
+OutputStream& operator<<(OutputStream& stream,
+                         const MemoryOutputStream& streamToRead);
+}
+#endif  // MEMORYOUTPUTSTREAM_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MessageSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MessageSysFlag.cpp b/rocketmq-cpp/src/common/MessageSysFlag.cpp
new file mode 100755
index 0000000..50a28b4
--- /dev/null
+++ b/rocketmq-cpp/src/common/MessageSysFlag.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MessageSysFlag.h"
+
+namespace rocketmq {
+int MessageSysFlag::CompressedFlag = (0x1 << 0);
+int MessageSysFlag::MultiTagsFlag = (0x1 << 1);
+
+int MessageSysFlag::TransactionNotType = (0x0 << 2);
+int MessageSysFlag::TransactionPreparedType = (0x1 << 2);
+int MessageSysFlag::TransactionCommitType = (0x2 << 2);
+int MessageSysFlag::TransactionRollbackType = (0x3 << 2);
+
+int MessageSysFlag::getTransactionValue(int flag) {
+  return flag & TransactionRollbackType;
+}
+
+int MessageSysFlag::resetTransactionValue(int flag, int type) {
+  return (flag & (~TransactionRollbackType)) | type;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MessageSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MessageSysFlag.h b/rocketmq-cpp/src/common/MessageSysFlag.h
new file mode 100755
index 0000000..d7f7993
--- /dev/null
+++ b/rocketmq-cpp/src/common/MessageSysFlag.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGESYSFLAG_H__
+#define __MESSAGESYSFLAG_H__
+
+namespace rocketmq {
+//<!************************************************************************
+class MessageSysFlag {
+ public:
+  static int getTransactionValue(int flag);
+  static int resetTransactionValue(int flag, int type);
+
+ public:
+  static int CompressedFlag;
+  static int MultiTagsFlag;
+  static int TransactionNotType;
+  static int TransactionPreparedType;
+  static int TransactionCommitType;
+  static int TransactionRollbackType;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/NamesrvConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/NamesrvConfig.h b/rocketmq-cpp/src/common/NamesrvConfig.h
new file mode 100755
index 0000000..6c0259f
--- /dev/null
+++ b/rocketmq-cpp/src/common/NamesrvConfig.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __NAMESRVCONFIG_H__
+#define __NAMESRVCONFIG_H__
+
+#include <stdlib.h>
+#include <string>
+namespace rocketmq {
+//<!***************************************************************************
+class NamesrvConfig {
+ public:
+  NamesrvConfig() {
+    m_kvConfigPath = "";
+
+    char* home = getenv(ROCKETMQ_HOME_ENV.c_str());
+    if (home) {
+      m_rocketmqHome = home;
+    } else {
+      m_rocketmqHome = "";
+    }
+  }
+
+  const string& getRocketmqHome() const { return m_rocketmqHome; }
+
+  void setRocketmqHome(const string& rocketmqHome) {
+    m_rocketmqHome = rocketmqHome;
+  }
+
+  const string& getKvConfigPath() const { return m_kvConfigPath; }
+
+  void setKvConfigPath(const string& kvConfigPath) {
+    m_kvConfigPath = kvConfigPath;
+  }
+
+ private:
+  string m_rocketmqHome;
+  string m_kvConfigPath;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/OutputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/OutputStream.cpp b/rocketmq-cpp/src/common/OutputStream.cpp
new file mode 100644
index 0000000..59d8c0f
--- /dev/null
+++ b/rocketmq-cpp/src/common/OutputStream.cpp
@@ -0,0 +1,129 @@
+#include "OutputStream.h"
+#include <limits>
+
+namespace rocketmq {
+//==============================================================================
+OutputStream::OutputStream() {}
+
+OutputStream::~OutputStream() {}
+
+//==============================================================================
+bool OutputStream::writeBool(const bool b) {
+  return writeByte(b ? (char)1 : (char)0);
+}
+
+bool OutputStream::writeByte(char byte) { return write(&byte, 1); }
+
+bool OutputStream::writeRepeatedByte(uint8 byte, size_t numTimesToRepeat) {
+  for (size_t i = 0; i < numTimesToRepeat; ++i)
+    if (!writeByte((char)byte)) return false;
+
+  return true;
+}
+
+bool OutputStream::writeShort(short value) {
+  const unsigned short v = ByteOrder::swapIfBigEndian((unsigned short)value);
+  return write(&v, 2);
+}
+
+bool OutputStream::writeShortBigEndian(short value) {
+  const unsigned short v = ByteOrder::swapIfLittleEndian((unsigned short)value);
+  return write(&v, 2);
+}
+
+bool OutputStream::writeInt(int value) {
+  const unsigned int v = ByteOrder::swapIfBigEndian((unsigned int)value);
+  return write(&v, 4);
+}
+
+bool OutputStream::writeIntBigEndian(int value) {
+  const unsigned int v = ByteOrder::swapIfLittleEndian((unsigned int)value);
+  return write(&v, 4);
+}
+
+bool OutputStream::writeCompressedInt(int value) {
+  unsigned int un = (value < 0) ? (unsigned int)-value : (unsigned int)value;
+
+  uint8 data[5];
+  int num = 0;
+
+  while (un > 0) {
+    data[++num] = (uint8)un;
+    un >>= 8;
+  }
+
+  data[0] = (uint8)num;
+
+  if (value < 0) data[0] |= 0x80;
+
+  return write(data, (size_t)num + 1);
+}
+
+bool OutputStream::writeInt64(int64 value) {
+  const uint64 v = ByteOrder::swapIfBigEndian((uint64)value);
+  return write(&v, 8);
+}
+
+bool OutputStream::writeInt64BigEndian(int64 value) {
+  const uint64 v = ByteOrder::swapIfLittleEndian((uint64)value);
+  return write(&v, 8);
+}
+
+bool OutputStream::writeFloat(float value) {
+  union {
+    int asInt;
+    float asFloat;
+  } n;
+  n.asFloat = value;
+  return writeInt(n.asInt);
+}
+
+bool OutputStream::writeFloatBigEndian(float value) {
+  union {
+    int asInt;
+    float asFloat;
+  } n;
+  n.asFloat = value;
+  return writeIntBigEndian(n.asInt);
+}
+
+bool OutputStream::writeDouble(double value) {
+  union {
+    int64 asInt;
+    double asDouble;
+  } n;
+  n.asDouble = value;
+  return writeInt64(n.asInt);
+}
+
+bool OutputStream::writeDoubleBigEndian(double value) {
+  union {
+    int64 asInt;
+    double asDouble;
+  } n;
+  n.asDouble = value;
+  return writeInt64BigEndian(n.asInt);
+}
+
+int64 OutputStream::writeFromInputStream(InputStream& source,
+                                         int64 numBytesToWrite) {
+  if (numBytesToWrite < 0) numBytesToWrite = std::numeric_limits<int64>::max();
+
+  int64 numWritten = 0;
+
+  while (numBytesToWrite > 0) {
+    char buffer[8192];
+    const int num = source.read(
+        buffer, (int)std::min(numBytesToWrite, (int64)sizeof(buffer)));
+
+    if (num <= 0) break;
+
+    write(buffer, (size_t)num);
+
+    numBytesToWrite -= num;
+    numWritten += num;
+  }
+
+  return numWritten;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/OutputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/OutputStream.h b/rocketmq-cpp/src/common/OutputStream.h
new file mode 100644
index 0000000..1fb2124
--- /dev/null
+++ b/rocketmq-cpp/src/common/OutputStream.h
@@ -0,0 +1,184 @@
+#ifndef OUTPUTSTREAM_H_INCLUDED
+#define OUTPUTSTREAM_H_INCLUDED
+
+#include "ByteOrder.h"
+#include "InputStream.h"
+namespace rocketmq {
+//==============================================================================
+/**
+    The base class for streams that write data to some kind of destination.
+
+    Input and output streams are used throughout the library - subclasses can
+   override
+    some or all of the virtual functions to implement their behaviour.
+
+    @see InputStream, MemoryOutputStream, FileOutputStream
+*/
+class ROCKETMQCLIENT_API OutputStream {
+ protected:
+  //==============================================================================
+  OutputStream();
+
+ public:
+  /** Destructor.
+
+      Some subclasses might want to do things like call flush() during their
+      destructors.
+  */
+  virtual ~OutputStream();
+
+  //==============================================================================
+  /** If the stream is using a buffer, this will ensure it gets written
+      out to the destination. */
+  virtual void flush() = 0;
+
+  /** Tries to move the stream's output position.
+
+      Not all streams will be able to seek to a new position - this will return
+      false if it fails to work.
+
+      @see getPosition
+  */
+  virtual bool setPosition(int64 newPosition) = 0;
+
+  /** Returns the stream's current position.
+
+      @see setPosition
+  */
+  virtual int64 getPosition() = 0;
+
+  //==============================================================================
+  /** Writes a block of data to the stream.
+
+      When creating a subclass of OutputStream, this is the only write method
+      that needs to be overloaded - the base class has methods for writing other
+      types of data which use this to do the work.
+
+      @param dataToWrite      the target buffer to receive the data. This must
+     not be null.
+      @param numberOfBytes    the number of bytes to write.
+      @returns false if the write operation fails for some reason
+  */
+  virtual bool write(const void* dataToWrite, size_t numberOfBytes) = 0;
+
+  //==============================================================================
+  /** Writes a single byte to the stream.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readByte
+  */
+  virtual bool writeByte(char byte);
+
+  /** Writes a boolean to the stream as a single byte.
+      This is encoded as a binary byte (not as text) with a value of 1 or 0.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readBool
+  */
+  virtual bool writeBool(bool boolValue);
+
+  /** Writes a 16-bit integer to the stream in a little-endian byte order.
+      This will write two bytes to the stream: (value & 0xff), then (value >>
+     8).
+      @returns false if the write operation fails for some reason
+      @see InputStream::readShort
+  */
+  virtual bool writeShort(short value);
+
+  /** Writes a 16-bit integer to the stream in a big-endian byte order.
+      This will write two bytes to the stream: (value >> 8), then (value &
+     0xff).
+      @returns false if the write operation fails for some reason
+      @see InputStream::readShortBigEndian
+  */
+  virtual bool writeShortBigEndian(short value);
+
+  /** Writes a 32-bit integer to the stream in a little-endian byte order.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readInt
+  */
+  virtual bool writeInt(int value);
+
+  /** Writes a 32-bit integer to the stream in a big-endian byte order.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readIntBigEndian
+  */
+  virtual bool writeIntBigEndian(int value);
+
+  /** Writes a 64-bit integer to the stream in a little-endian byte order.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readInt64
+  */
+  virtual bool writeInt64(int64 value);
+
+  /** Writes a 64-bit integer to the stream in a big-endian byte order.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readInt64BigEndian
+  */
+  virtual bool writeInt64BigEndian(int64 value);
+
+  /** Writes a 32-bit floating point value to the stream in a binary format.
+      The binary 32-bit encoding of the float is written as a little-endian int.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readFloat
+  */
+  virtual bool writeFloat(float value);
+
+  /** Writes a 32-bit floating point value to the stream in a binary format.
+      The binary 32-bit encoding of the float is written as a big-endian int.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readFloatBigEndian
+  */
+  virtual bool writeFloatBigEndian(float value);
+
+  /** Writes a 64-bit floating point value to the stream in a binary format.
+      The eight raw bytes of the double value are written out as a little-endian
+     64-bit int.
+      @returns false if the write operation fails for some reason
+      @see InputStream::readDouble
+  */
+  virtual bool writeDouble(double value);
+
+  /** Writes a 64-bit floating point value to the stream in a binary format.
+      The eight raw bytes of the double value are written out as a big-endian
+     64-bit int.
+      @see InputStream::readDoubleBigEndian
+      @returns false if the write operation fails for some reason
+  */
+  virtual bool writeDoubleBigEndian(double value);
+
+  /** Writes a byte to the output stream a given number of times.
+      @returns false if the write operation fails for some reason
+  */
+  virtual bool writeRepeatedByte(uint8 byte, size_t numTimesToRepeat);
+
+  /** Writes a condensed binary encoding of a 32-bit integer.
+
+      If you're storing a lot of integers which are unlikely to have very large
+     values,
+      this can save a lot of space, because values under 0xff will only take up
+     2 bytes,
+      under 0xffff only 3 bytes, etc.
+
+      The format used is: number of significant bytes + up to 4 bytes in
+     little-endian order.
+
+      @returns false if the write operation fails for some reason
+      @see InputStream::readCompressedInt
+  */
+  virtual bool writeCompressedInt(int value);
+
+  /** Reads data from an input stream and writes it to this stream.
+
+      @param source               the stream to read from
+      @param maxNumBytesToWrite   the number of bytes to read from the stream
+     (if this is
+                                  less than zero, it will keep reading until the
+     input
+                                  is exhausted)
+      @returns the number of bytes written
+  */
+  virtual int64 writeFromInputStream(InputStream& source,
+                                     int64 maxNumBytesToWrite);
+};
+}
+
+#endif  // OUTPUTSTREAM_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PermName.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PermName.cpp b/rocketmq-cpp/src/common/PermName.cpp
new file mode 100755
index 0000000..7f168a4
--- /dev/null
+++ b/rocketmq-cpp/src/common/PermName.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PermName.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+int PermName::PERM_PRIORITY = 0x1 << 3;
+int PermName::PERM_READ = 0x1 << 2;
+int PermName::PERM_WRITE = 0x1 << 1;
+int PermName::PERM_INHERIT = 0x1 << 0;
+
+bool PermName::isReadable(int perm) { return (perm & PERM_READ) == PERM_READ; }
+
+bool PermName::isWriteable(int perm) {
+  return (perm & PERM_WRITE) == PERM_WRITE;
+}
+
+bool PermName::isInherited(int perm) {
+  return (perm & PERM_INHERIT) == PERM_INHERIT;
+}
+
+string PermName::perm2String(int perm) {
+  string pm("---");
+  if (isReadable(perm)) {
+    pm.replace(0, 1, "R");
+  }
+
+  if (isWriteable(perm)) {
+    pm.replace(1, 2, "W");
+  }
+
+  if (isInherited(perm)) {
+    pm.replace(2, 3, "X");
+  }
+
+  return pm;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PermName.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PermName.h b/rocketmq-cpp/src/common/PermName.h
new file mode 100755
index 0000000..6556382
--- /dev/null
+++ b/rocketmq-cpp/src/common/PermName.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PERMNAME_H__
+#define __PERMNAME_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class PermName {
+ public:
+  static int PERM_PRIORITY;
+  static int PERM_READ;
+  static int PERM_WRITE;
+  static int PERM_INHERIT;
+
+  static bool isReadable(int perm);
+  static bool isWriteable(int perm);
+  static bool isInherited(int perm);
+  static std::string perm2String(int perm);
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PullSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PullSysFlag.cpp b/rocketmq-cpp/src/common/PullSysFlag.cpp
new file mode 100755
index 0000000..6d68457
--- /dev/null
+++ b/rocketmq-cpp/src/common/PullSysFlag.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullSysFlag.h"
+
+namespace rocketmq {
+//<!************************************************************************
+int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0;
+int PullSysFlag::FLAG_SUSPEND = 0x1 << 1;
+int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2;
+int PullSysFlag::FLAG_CLASS_FILTER = 0x1 << 3;
+
+int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend,
+                              bool subscription, bool classFilter) {
+  int flag = 0;
+
+  if (commitOffset) {
+    flag |= FLAG_COMMIT_OFFSET;
+  }
+
+  if (suspend) {
+    flag |= FLAG_SUSPEND;
+  }
+
+  if (subscription) {
+    flag |= FLAG_SUBSCRIPTION;
+  }
+
+  if (classFilter) {
+    flag |= FLAG_CLASS_FILTER;
+  }
+
+  return flag;
+}
+
+int PullSysFlag::clearCommitOffsetFlag(int sysFlag) {
+  return sysFlag & (~FLAG_COMMIT_OFFSET);
+}
+
+bool PullSysFlag::hasCommitOffsetFlag(int sysFlag) {
+  return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+}
+
+bool PullSysFlag::hasSuspendFlag(int sysFlag) {
+  return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+}
+
+bool PullSysFlag::hasSubscriptionFlag(int sysFlag) {
+  return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+}
+
+bool PullSysFlag::hasClassFilterFlag(int sysFlag) {
+  return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+}
+
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PullSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PullSysFlag.h b/rocketmq-cpp/src/common/PullSysFlag.h
new file mode 100755
index 0000000..c809772
--- /dev/null
+++ b/rocketmq-cpp/src/common/PullSysFlag.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PULLSYSFLAG_H__
+#define __PULLSYSFLAG_H__
+namespace rocketmq {
+//<!************************************************************************
+class PullSysFlag {
+ public:
+  static int buildSysFlag(bool commitOffset, bool suspend, bool subscription,
+                          bool classFilter);
+
+  static int clearCommitOffsetFlag(int sysFlag);
+  static bool hasCommitOffsetFlag(int sysFlag);
+  static bool hasSuspendFlag(int sysFlag);
+  static bool hasSubscriptionFlag(int sysFlag);
+  static bool hasClassFilterFlag(int sysFlag);
+
+ private:
+  static int FLAG_COMMIT_OFFSET;
+  static int FLAG_SUSPEND;
+  static int FLAG_SUBSCRIPTION;
+  static int FLAG_CLASS_FILTER;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/ServiceState.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/ServiceState.h b/rocketmq-cpp/src/common/ServiceState.h
new file mode 100755
index 0000000..a8ae792
--- /dev/null
+++ b/rocketmq-cpp/src/common/ServiceState.h
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SERVICESTATE_H__
+#define __SERVICESTATE_H__
+namespace rocketmq {
+//<!***************************************************************************
+enum ServiceState { CREATE_JUST, RUNNING, SHUTDOWN_ALREADY, START_FAILED };
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
new file mode 100755
index 0000000..9a54140
--- /dev/null
+++ b/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SUBSCRIPTIONGROUPCONFIG_H__
+#define __SUBSCRIPTIONGROUPCONFIG_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class SubscriptionGroupConfig {
+ public:
+  SubscriptionGroupConfig(const string& groupName) {
+    this->groupName = groupName;
+    consumeEnable = true;
+    consumeFromMinEnable = true;
+    consumeBroadcastEnable = true;
+    retryQueueNums = 1;
+    retryMaxTimes = 5;
+    brokerId = MASTER_ID;
+    whichBrokerWhenConsumeSlowly = 1;
+  }
+
+  string groupName;
+  bool consumeEnable;
+  bool consumeFromMinEnable;
+  bool consumeBroadcastEnable;
+  int retryQueueNums;
+  int retryMaxTimes;
+  int brokerId;
+  int whichBrokerWhenConsumeSlowly;
+};
+
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopAddressing.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopAddressing.cpp b/rocketmq-cpp/src/common/TopAddressing.cpp
new file mode 100644
index 0000000..3a1742d
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopAddressing.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TopAddressing.h"
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <vector>
+#include "UtilAll.h"
+#include "sync_http_client.h"
+#include "url.h"
+
+namespace rocketmq {
+TopAddressing::TopAddressing(string unitName) : m_unitName(unitName) {}
+
+TopAddressing::~TopAddressing() {}
+
+int TopAddressing::IsIPAddr(const char* sValue) {
+  if (NULL == sValue) return -1;
+
+  while (*sValue != '\0') {
+    if ((*sValue < '0' || *sValue > '9') && (*sValue != '.')) return -1;
+    sValue++;
+  }
+  return 0;
+}
+
+void TopAddressing::updateNameServerAddressList(const string& adds) {
+  boost::lock_guard<boost::mutex> lock(m_addrLock);
+  vector<string> out;
+  UtilAll::Split(out, adds, ";");
+  if (out.size() > 0) m_addrs.clear();
+  for (size_t i = 0; i < out.size(); i++) {
+    string addr = out[i];
+    UtilAll::Trim(addr);
+
+    list<string>::iterator findit = find(m_addrs.begin(), m_addrs.end(), addr);
+    if (findit == m_addrs.end()) {
+      string hostName;
+      short portNumber;
+      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+        LOG_INFO("updateNameServerAddressList:%s", addr.c_str());
+        m_addrs.push_back(addr);
+      }
+    }
+  }
+}
+
+string TopAddressing::fetchNSAddr(const string& NSDomain) {
+  LOG_DEBUG("fetchNSAddr begin");
+  string nsAddr = NSDomain.empty() ? WS_ADDR : NSDomain;
+  if (!m_unitName.empty()) {
+    nsAddr = nsAddr + "-" + m_unitName + "?nofix=1";
+    LOG_INFO("NSAddr is:%s", nsAddr.c_str());
+  }
+
+  std::string tmp_nameservers;
+  std::string nameservers;
+  Url url_s(nsAddr);
+  LOG_INFO("fetchNSAddr protocol: %s, port: %s, host:%s, path:%s, ",
+           url_s.protocol_.c_str(), url_s.port_.c_str(), url_s.host_.c_str(),
+           url_s.path_.c_str());
+
+  bool ret = SyncfetchNsAddr(url_s, tmp_nameservers);
+  if (ret) {
+    nameservers = clearNewLine(tmp_nameservers);
+    if (nameservers.empty()) {
+      LOG_ERROR("fetchNSAddr with domain is empty");
+    } else {
+      updateNameServerAddressList(nameservers);
+    }
+  } else {
+    LOG_ERROR(
+        "fetchNSAddr with domain failed, connect failure or wrong response");
+  }
+
+  return nameservers;
+}
+
+string TopAddressing::clearNewLine(const string& str) {
+  string newString = str;
+  size_t index = newString.find("\r");
+  if (index != string::npos) {
+    return newString.substr(0, index);
+  }
+
+  index = newString.find("\n");
+  if (index != string::npos) {
+    return newString.substr(0, index);
+  }
+
+  return newString;
+}
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopAddressing.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopAddressing.h b/rocketmq-cpp/src/common/TopAddressing.h
new file mode 100755
index 0000000..a850023
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopAddressing.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPADDRESSING_H__
+#define __TOPADDRESSING_H__
+
+#include <sys/time.h>
+#include <boost/thread/thread.hpp>
+#include <list>
+#include <map>
+#include <string>
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+class TopAddressing {
+ public:
+  TopAddressing(string unitName);
+  virtual ~TopAddressing();
+
+ public:
+  string fetchNSAddr(const string& NSDomain);
+
+ private:
+  string clearNewLine(const string& str);
+  void updateNameServerAddressList(const string& adds);
+  int IsIPAddr(const char* sValue);
+
+ private:
+  boost::mutex m_addrLock;
+  list<string> m_addrs;
+  string m_unitName;
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicConfig.cpp b/rocketmq-cpp/src/common/TopicConfig.cpp
new file mode 100755
index 0000000..e0e1b4d
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicConfig.cpp
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TopicConfig.h"
+#include <stdlib.h>
+#include <sstream>
+#include "PermName.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+int TopicConfig::DefaultReadQueueNums = 16;
+int TopicConfig::DefaultWriteQueueNums = 16;
+string TopicConfig::SEPARATOR = " ";
+
+TopicConfig::TopicConfig()
+    : m_topicName(""),
+      m_readQueueNums(DefaultReadQueueNums),
+      m_writeQueueNums(DefaultWriteQueueNums),
+      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+      m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::TopicConfig(const string& topicName)
+    : m_topicName(topicName),
+      m_readQueueNums(DefaultReadQueueNums),
+      m_writeQueueNums(DefaultWriteQueueNums),
+      m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+      m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::TopicConfig(const string& topicName, int readQueueNums,
+                         int writeQueueNums, int perm)
+    : m_topicName(topicName),
+      m_readQueueNums(readQueueNums),
+      m_writeQueueNums(writeQueueNums),
+      m_perm(perm),
+      m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::~TopicConfig() {}
+
+string TopicConfig::encode() {
+  stringstream ss;
+  ss << m_topicName << SEPARATOR << m_readQueueNums << SEPARATOR
+     << m_writeQueueNums << SEPARATOR << m_perm << SEPARATOR
+     << m_topicFilterType;
+
+  return ss.str();
+}
+
+bool TopicConfig::decode(const string& in) {
+  stringstream ss(in);
+
+  ss >> m_topicName;
+  ss >> m_readQueueNums;
+  ss >> m_writeQueueNums;
+  ss >> m_perm;
+
+  int type;
+  ss >> type;
+  m_topicFilterType = (TopicFilterType)type;
+
+  return true;
+}
+
+const string& TopicConfig::getTopicName() { return m_topicName; }
+
+void TopicConfig::setTopicName(const string& topicName) {
+  m_topicName = topicName;
+}
+
+int TopicConfig::getReadQueueNums() { return m_readQueueNums; }
+
+void TopicConfig::setReadQueueNums(int readQueueNums) {
+  m_readQueueNums = readQueueNums;
+}
+
+int TopicConfig::getWriteQueueNums() { return m_writeQueueNums; }
+
+void TopicConfig::setWriteQueueNums(int writeQueueNums) {
+  m_writeQueueNums = writeQueueNums;
+}
+
+int TopicConfig::getPerm() { return m_perm; }
+
+void TopicConfig::setPerm(int perm) { m_perm = perm; }
+
+TopicFilterType TopicConfig::getTopicFilterType() { return m_topicFilterType; }
+
+void TopicConfig::setTopicFilterType(TopicFilterType topicFilterType) {
+  m_topicFilterType = topicFilterType;
+}
+//<!***************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicConfig.h b/rocketmq-cpp/src/common/TopicConfig.h
new file mode 100755
index 0000000..0e7c17a
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicConfig.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICCONFIG_H__
+#define __TOPICCONFIG_H__
+
+#include <string>
+#include "TopicFilterType.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!************************************************************************
+class TopicConfig {
+ public:
+  TopicConfig();
+  TopicConfig(const string& topicName);
+  TopicConfig(const string& topicName, int readQueueNums, int writeQueueNums,
+              int perm);
+  ~TopicConfig();
+
+  string encode();
+  bool decode(const string& in);
+  const string& getTopicName();
+  void setTopicName(const string& topicName);
+  int getReadQueueNums();
+  void setReadQueueNums(int readQueueNums);
+  int getWriteQueueNums();
+  void setWriteQueueNums(int writeQueueNums);
+  int getPerm();
+  void setPerm(int perm);
+  TopicFilterType getTopicFilterType();
+  void setTopicFilterType(TopicFilterType topicFilterType);
+
+ public:
+  static int DefaultReadQueueNums;
+  static int DefaultWriteQueueNums;
+
+ private:
+  static string SEPARATOR;
+
+  string m_topicName;
+  int m_readQueueNums;
+  int m_writeQueueNums;
+  int m_perm;
+  TopicFilterType m_topicFilterType;
+};
+//<!***************************************************************************
+}  //<!end namespace;
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicFilterType.h b/rocketmq-cpp/src/common/TopicFilterType.h
new file mode 100755
index 0000000..9055003
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicFilterType.h
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICFILTERTYPE_H__
+#define __TOPICFILTERTYPE_H__
+
+namespace rocketmq {
+//<!***************************************************************************
+enum TopicFilterType {
+  /**
+   * each msg could only have one tag
+   */
+  SINGLE_TAG,
+  /**
+   * not support now
+   */
+  MULTI_TAG
+};
+//<!***************************************************************************
+}  //<!end namespace;
+#endif