You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:54 UTC
[08/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/FilterAPI.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/FilterAPI.h b/rocketmq-cpp/src/common/FilterAPI.h
new file mode 100755
index 0000000..c95f17e
--- /dev/null
+++ b/rocketmq-cpp/src/common/FilterAPI.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FILTERAPI_H__
+#define __FILTERAPI_H__
+
+#include <string>
+#include "MQClientException.h"
+#include "SubscriptionData.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!***************************************************************************
+class FilterAPI {
+ public:
+ static SubscriptionData* buildSubscriptionData(const string topic,
+ const string& subString) {
+ //<!delete in balance;
+ SubscriptionData* subscriptionData = new SubscriptionData(topic, subString);
+
+ if (subString.empty() || !subString.compare(SUB_ALL)) {
+ subscriptionData->setSubString(SUB_ALL);
+ } else {
+ vector<string> out;
+ UtilAll::Split(out, subString, "||");
+
+ if (out.empty()) {
+ THROW_MQEXCEPTION(MQClientException, "FilterAPI subString split error",
+ -1);
+ }
+
+ for (size_t i = 0; i < out.size(); i++) {
+ string tag = out[i];
+ if (!tag.empty()) {
+ UtilAll::Trim(tag);
+ if (!tag.empty()) {
+ subscriptionData->putTagsSet(tag);
+ subscriptionData->putCodeSet(tag);
+ }
+ }
+ }
+ }
+
+ return subscriptionData;
+ }
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/InputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/InputStream.cpp b/rocketmq-cpp/src/common/InputStream.cpp
new file mode 100644
index 0000000..4eb5587
--- /dev/null
+++ b/rocketmq-cpp/src/common/InputStream.cpp
@@ -0,0 +1,149 @@
+#include "InputStream.h"
+#include <algorithm>
+#include "MemoryOutputStream.h"
+
+namespace rocketmq {
+int64 InputStream::getNumBytesRemaining() {
+ int64 len = getTotalLength();
+
+ if (len >= 0) len -= getPosition();
+
+ return len;
+}
+
+char InputStream::readByte() {
+ char temp = 0;
+ read(&temp, 1);
+ return temp;
+}
+
+bool InputStream::readBool() { return readByte() != 0; }
+
+short InputStream::readShort() {
+ char temp[2];
+
+ if (read(temp, 2) == 2) return (short)ByteOrder::littleEndianShort(temp);
+
+ return 0;
+}
+
+short InputStream::readShortBigEndian() {
+ char temp[2];
+
+ if (read(temp, 2) == 2) return (short)ByteOrder::bigEndianShort(temp);
+
+ return 0;
+}
+
+int InputStream::readInt() {
+ char temp[4];
+
+ if (read(temp, 4) == 4) return (int)ByteOrder::littleEndianInt(temp);
+
+ return 0;
+}
+
+int InputStream::readIntBigEndian() {
+ char temp[4];
+
+ if (read(temp, 4) == 4) return (int)ByteOrder::bigEndianInt(temp);
+
+ return 0;
+}
+
+int InputStream::readCompressedInt() {
+ const uint8 sizeByte = (uint8)readByte();
+ if (sizeByte == 0) return 0;
+
+ const int numBytes = (sizeByte & 0x7f);
+ if (numBytes > 4) {
+ return 0;
+ }
+
+ char bytes[4] = {0, 0, 0, 0};
+ if (read(bytes, numBytes) != numBytes) return 0;
+
+ const int num = (int)ByteOrder::littleEndianInt(bytes);
+ return (sizeByte >> 7) ? -num : num;
+}
+
+int64 InputStream::readInt64() {
+ union {
+ uint8 asBytes[8];
+ uint64 asInt64;
+ } n;
+
+ if (read(n.asBytes, 8) == 8)
+ return (int64)ByteOrder::swapIfBigEndian(n.asInt64);
+
+ return 0;
+}
+
+int64 InputStream::readInt64BigEndian() {
+ union {
+ uint8 asBytes[8];
+ uint64 asInt64;
+ } n;
+
+ if (read(n.asBytes, 8) == 8)
+ return (int64)ByteOrder::swapIfLittleEndian(n.asInt64);
+
+ return 0;
+}
+
+float InputStream::readFloat() {
+ // the union below relies on these types being the same size...
+ union {
+ int32 asInt;
+ float asFloat;
+ } n;
+ n.asInt = (int32)readInt();
+ return n.asFloat;
+}
+
+float InputStream::readFloatBigEndian() {
+ union {
+ int32 asInt;
+ float asFloat;
+ } n;
+ n.asInt = (int32)readIntBigEndian();
+ return n.asFloat;
+}
+
+double InputStream::readDouble() {
+ union {
+ int64 asInt;
+ double asDouble;
+ } n;
+ n.asInt = readInt64();
+ return n.asDouble;
+}
+
+double InputStream::readDoubleBigEndian() {
+ union {
+ int64 asInt;
+ double asDouble;
+ } n;
+ n.asInt = readInt64BigEndian();
+ return n.asDouble;
+}
+
+size_t InputStream::readIntoMemoryBlock(MemoryBlock& block, ssize_t numBytes) {
+ MemoryOutputStream mo(block, true);
+ return (size_t)mo.writeFromInputStream(*this, numBytes);
+}
+
+//==============================================================================
+void InputStream::skipNextBytes(int64 numBytesToSkip) {
+ if (numBytesToSkip > 0) {
+ const int skipBufferSize = (int)std::min(numBytesToSkip, (int64)16384);
+ char* temp = static_cast<char*>(std::malloc(skipBufferSize * sizeof(char)));
+
+ while (numBytesToSkip > 0 && !isExhausted())
+ numBytesToSkip -=
+ read(temp, (int)std::min(numBytesToSkip, (int64)skipBufferSize));
+
+ std::free(temp);
+ }
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/InputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/InputStream.h b/rocketmq-cpp/src/common/InputStream.h
new file mode 100644
index 0000000..c987be7
--- /dev/null
+++ b/rocketmq-cpp/src/common/InputStream.h
@@ -0,0 +1,241 @@
+#ifndef INPUTSTREAM_H_INCLUDED
+#define INPUTSTREAM_H_INCLUDED
+
+#include "ByteOrder.h"
+#include "dataBlock.h"
+//==============================================================================
+/** The base class for streams that read data.
+
+ Input and output streams are used throughout the library - subclasses can
+ override
+ some or all of the virtual functions to implement their behaviour.
+
+ @see OutputStream, MemoryInputStream, BufferedInputStream, FileInputStream
+*/
+namespace rocketmq {
+class ROCKETMQCLIENT_API InputStream {
+ public:
+ /** Destructor. */
+ virtual ~InputStream() {}
+
+ //==============================================================================
+ /** Returns the total number of bytes available for reading in this stream.
+
+ Note that this is the number of bytes available from the start of the
+ stream, not from the current position.
+
+ If the size of the stream isn't actually known, this will return -1.
+
+ @see getNumBytesRemaining
+ */
+ virtual int64 getTotalLength() = 0;
+
+ /** Returns the number of bytes available for reading, or a negative value if
+ the remaining length is not known.
+ @see getTotalLength
+ */
+ int64 getNumBytesRemaining();
+
+ /** Returns true if the stream has no more data to read. */
+ virtual bool isExhausted() = 0;
+
+ //==============================================================================
+ /** Reads some data from the stream into a memory buffer.
+
+ This is the only read method that subclasses actually need to implement,
+ as the
+ InputStream base class implements the other read methods in terms of this
+ one (although
+ it's often more efficient for subclasses to implement them directly).
+
+ @param destBuffer the destination buffer for the data. This must not
+ be null.
+ @param maxBytesToRead the maximum number of bytes to read - make sure
+ the
+ memory block passed in is big enough to contain
+ this
+ many bytes. This value must not be negative.
+
+ @returns the actual number of bytes that were read, which may be less
+ than
+ maxBytesToRead if the stream is exhausted before it gets that
+ far
+ */
+ virtual int read(void* destBuffer, int maxBytesToRead) = 0;
+
+ /** Reads a byte from the stream.
+ If the stream is exhausted, this will return zero.
+ @see OutputStream::writeByte
+ */
+ virtual char readByte();
+
+ /** Reads a boolean from the stream.
+ The bool is encoded as a single byte - non-zero for true, 0 for false.
+ If the stream is exhausted, this will return false.
+ @see OutputStream::writeBool
+ */
+ virtual bool readBool();
+
+ /** Reads two bytes from the stream as a little-endian 16-bit value.
+ If the next two bytes read are byte1 and byte2, this returns (byte1 |
+ (byte2 << 8)).
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeShort, readShortBigEndian
+ */
+ virtual short readShort();
+
+ /** Reads two bytes from the stream as a little-endian 16-bit value.
+ If the next two bytes read are byte1 and byte2, this returns (byte2 |
+ (byte1 << 8)).
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeShortBigEndian, readShort
+ */
+ virtual short readShortBigEndian();
+
+ /** Reads four bytes from the stream as a little-endian 32-bit value.
+
+ If the next four bytes are byte1 to byte4, this returns
+ (byte1 | (byte2 << 8) | (byte3 << 16) | (byte4 << 24)).
+
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+
+ @see OutputStream::writeInt, readIntBigEndian
+ */
+ virtual int readInt();
+
+ /** Reads four bytes from the stream as a big-endian 32-bit value.
+
+ If the next four bytes are byte1 to byte4, this returns
+ (byte4 | (byte3 << 8) | (byte2 << 16) | (byte1 << 24)).
+
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+
+ @see OutputStream::writeIntBigEndian, readInt
+ */
+ virtual int readIntBigEndian();
+
+ /** Reads eight bytes from the stream as a little-endian 64-bit value.
+
+ If the next eight bytes are byte1 to byte8, this returns
+ (byte1 | (byte2 << 8) | (byte3 << 16) | (byte4 << 24) | (byte5 << 32) |
+ (byte6 << 40) | (byte7 << 48) | (byte8 << 56)).
+
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+
+ @see OutputStream::writeInt64, readInt64BigEndian
+ */
+ virtual int64 readInt64();
+
+ /** Reads eight bytes from the stream as a big-endian 64-bit value.
+
+ If the next eight bytes are byte1 to byte8, this returns
+ (byte8 | (byte7 << 8) | (byte6 << 16) | (byte5 << 24) | (byte4 << 32) |
+ (byte3 << 40) | (byte2 << 48) | (byte1 << 56)).
+
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+
+ @see OutputStream::writeInt64BigEndian, readInt64
+ */
+ virtual int64 readInt64BigEndian();
+
+ /** Reads four bytes as a 32-bit floating point value.
+ The raw 32-bit encoding of the float is read from the stream as a
+ little-endian int.
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeFloat, readDouble
+ */
+ virtual float readFloat();
+
+ /** Reads four bytes as a 32-bit floating point value.
+ The raw 32-bit encoding of the float is read from the stream as a
+ big-endian int.
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeFloatBigEndian, readDoubleBigEndian
+ */
+ virtual float readFloatBigEndian();
+
+ /** Reads eight bytes as a 64-bit floating point value.
+ The raw 64-bit encoding of the double is read from the stream as a
+ little-endian int64.
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeDouble, readFloat
+ */
+ virtual double readDouble();
+
+ /** Reads eight bytes as a 64-bit floating point value.
+ The raw 64-bit encoding of the double is read from the stream as a
+ big-endian int64.
+ If the stream is exhausted partway through reading the bytes, this will
+ return zero.
+ @see OutputStream::writeDoubleBigEndian, readFloatBigEndian
+ */
+ virtual double readDoubleBigEndian();
+
+ /** Reads an encoded 32-bit number from the stream using a space-saving
+ compressed format.
+ For small values, this is more space-efficient than using readInt() and
+ OutputStream::writeInt()
+ The format used is: number of significant bytes + up to 4 bytes in
+ little-endian order.
+ @see OutputStream::writeCompressedInt()
+ */
+ virtual int readCompressedInt();
+
+ //==============================================================================whole
+ // stream and turn it into a string.
+ /** Reads from the stream and appends the data to a MemoryBlock.
+
+ @param destBlock the block to append the data onto
+ @param maxNumBytesToRead if this is a positive value, it sets a limit
+ to the number
+ of bytes that will be read - if it's negative,
+ data
+ will be read until the stream is exhausted.
+ @returns the number of bytes that were added to the memory block
+ */
+ virtual size_t readIntoMemoryBlock(MemoryBlock& destBlock,
+ ssize_t maxNumBytesToRead = -1);
+
+ //==============================================================================
+ /** Returns the offset of the next byte that will be read from the stream.
+ @see setPosition
+ */
+ virtual int64 getPosition() = 0;
+
+ /** Tries to move the current read position of the stream.
+
+ The position is an absolute number of bytes from the stream's start.
+
+ Some streams might not be able to do this, in which case they should do
+ nothing and return false. Others might be able to manage it by resetting
+ themselves and skipping to the correct position, although this is
+ obviously a bit slow.
+
+ @returns true if the stream manages to reposition itself correctly
+ @see getPosition
+ */
+ virtual bool setPosition(int64 newPosition) = 0;
+
+ /** Reads and discards a number of bytes from the stream.
+
+ Some input streams might implement this efficiently, but the base
+ class will just keep reading data until the requisite number of bytes
+ have been done.
+ */
+ virtual void skipNextBytes(int64 numBytesToSkip);
+
+ protected:
+ //==============================================================================
+ InputStream() {}
+};
+}
+#endif // INPUTSTREAM_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQClient.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQClient.cpp b/rocketmq-cpp/src/common/MQClient.cpp
new file mode 100755
index 0000000..25d84de
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQClient.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MQClient.h"
+#include "Logging.h"
+#include "MQClientFactory.h"
+#include "MQClientManager.h"
+#include "TopicPublishInfo.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+#define METAQCPP_VERSION "1.0.0"
+#define BUILD_DATE "08-08-2017"
+// display version: strings bin/librocketmq.so |grep VERSION
+const char *metaq_build_time =
+ "VERSION: " METAQCPP_VERSION ", BUILD DATE: " BUILD_DATE " ";
+
+//<!************************************************************************
+MQClient::MQClient() {
+ string NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
+ if (const char *addr = getenv(NAMESRV_ADDR_ENV.c_str()))
+ m_namesrvAddr = addr;
+ else
+ m_namesrvAddr = "";
+
+ m_instanceName = "DEFAULT";
+ m_clientFactory = NULL;
+ m_serviceState = CREATE_JUST;
+ m_pullThreadNum = boost::thread::hardware_concurrency();
+ m_tcpConnectTimeout = 3000; // 3s
+ m_tcpTransportTryLockTimeout = 3; // 3s
+ m_unitName = "";
+}
+
+MQClient::~MQClient() {}
+
+string MQClient::getMQClientId() const {
+ string clientIP = UtilAll::getLocalAddress();
+ string processId = UtilAll::to_string(getpid());
+ return processId + "-" + clientIP + "@" + m_instanceName;
+}
+
+//<!groupName;
+const string &MQClient::getGroupName() const { return m_GroupName; }
+
+void MQClient::setGroupName(const string &groupname) {
+ m_GroupName = groupname;
+}
+
+const string &MQClient::getNamesrvAddr() const { return m_namesrvAddr; }
+
+void MQClient::setNamesrvAddr(const string &namesrvAddr) {
+ m_namesrvAddr = namesrvAddr;
+}
+
+const string &MQClient::getNamesrvDomain() const { return m_namesrvDomain; }
+
+void MQClient::setNamesrvDomain(const string &namesrvDomain) {
+ m_namesrvDomain = namesrvDomain;
+}
+
+const string &MQClient::getInstanceName() const { return m_instanceName; }
+
+void MQClient::setInstanceName(const string &instanceName) {
+ m_instanceName = instanceName;
+}
+
+void MQClient::createTopic(const string &key, const string &newTopic,
+ int queueNum) {
+ try {
+ getFactory()->createTopic(key, newTopic, queueNum, m_SessionCredentials);
+ } catch (MQException &e) {
+ LOG_ERROR(e.what());
+ }
+}
+
+int64 MQClient::earliestMsgStoreTime(const MQMessageQueue &mq) {
+ return getFactory()->earliestMsgStoreTime(mq, m_SessionCredentials);
+}
+
+QueryResult MQClient::queryMessage(const string &topic, const string &key,
+ int maxNum, int64 begin, int64 end) {
+ return getFactory()->queryMessage(topic, key, maxNum, begin, end,
+ m_SessionCredentials);
+}
+
+int64 MQClient::minOffset(const MQMessageQueue &mq) {
+ return getFactory()->minOffset(mq, m_SessionCredentials);
+}
+
+int64 MQClient::maxOffset(const MQMessageQueue &mq) {
+ return getFactory()->maxOffset(mq, m_SessionCredentials);
+}
+
+int64 MQClient::searchOffset(const MQMessageQueue &mq, uint64_t timestamp) {
+ return getFactory()->searchOffset(mq, timestamp, m_SessionCredentials);
+}
+
+MQMessageExt *MQClient::viewMessage(const string &msgId) {
+ return getFactory()->viewMessage(msgId, m_SessionCredentials);
+}
+
+vector<MQMessageQueue> MQClient::getTopicMessageQueueInfo(const string &topic) {
+ boost::weak_ptr<TopicPublishInfo> weak_topicPublishInfo(
+ getFactory()->tryToFindTopicPublishInfo(topic, m_SessionCredentials));
+ boost::shared_ptr<TopicPublishInfo> topicPublishInfo(
+ weak_topicPublishInfo.lock());
+ if (topicPublishInfo) {
+ return topicPublishInfo->getMessageQueueList();
+ }
+ THROW_MQEXCEPTION(
+ MQClientException,
+ "could not find MessageQueue Info of topic: [" + topic + "].", -1);
+}
+
+void MQClient::start() {
+ if (getFactory() == NULL) {
+ m_clientFactory = MQClientManager::getInstance()->getMQClientFactory(
+ getMQClientId(), m_pullThreadNum, m_tcpConnectTimeout,
+ m_tcpTransportTryLockTimeout, m_unitName);
+ }
+ LOG_INFO(
+ "MQClient "
+ "start,groupname:%s,clientID:%s,instanceName:%s,nameserveraddr:%s",
+ getGroupName().c_str(), getMQClientId().c_str(),
+ getInstanceName().c_str(), getNamesrvAddr().c_str());
+}
+
+void MQClient::shutdown() { m_clientFactory = NULL; }
+
+MQClientFactory *MQClient::getFactory() const { return m_clientFactory; }
+
+bool MQClient::isServiceStateOk() { return m_serviceState == RUNNING; }
+
+void MQClient::setMetaqLogLevel(elogLevel inputLevel) {
+ ALOG_ADAPTER.setLogLevel(inputLevel);
+}
+
+void MQClient::setMetaqLogFileSizeAndNum(int fileNum, long perFileSize) {
+ ALOG_ADAPTER.setLogFileNumAndSize(fileNum, perFileSize);
+}
+
+void MQClient::setTcpTransportPullThreadNum(int num) {
+ if (num > m_pullThreadNum) {
+ m_pullThreadNum = num;
+ }
+}
+
+const int MQClient::getTcpTransportPullThreadNum() const {
+ return m_pullThreadNum;
+}
+
+void MQClient::setTcpTransportConnectTimeout(uint64_t timeout) {
+ m_tcpConnectTimeout = timeout;
+}
+const uint64_t MQClient::getTcpTransportConnectTimeout() const {
+ return m_tcpConnectTimeout;
+}
+
+void MQClient::setTcpTransportTryLockTimeout(uint64_t timeout) {
+ if (timeout < 1000) {
+ timeout = 1000;
+ }
+ m_tcpTransportTryLockTimeout = timeout / 1000;
+}
+const uint64_t MQClient::getTcpTransportTryLockTimeout() const {
+ return m_tcpTransportTryLockTimeout;
+}
+
+void MQClient::setUnitName(string unitName) { m_unitName = unitName; }
+const string &MQClient::getUnitName() { return m_unitName; }
+
+void MQClient::setSessionCredentials(const string &input_accessKey,
+ const string &input_secretKey,
+ const string &input_onsChannel) {
+ m_SessionCredentials.setAccessKey(input_accessKey);
+ m_SessionCredentials.setSecretKey(input_secretKey);
+ m_SessionCredentials.setAuthChannel(input_onsChannel);
+}
+
+const SessionCredentials &MQClient::getSessionCredentials() const {
+ return m_SessionCredentials;
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQVersion.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQVersion.cpp b/rocketmq-cpp/src/common/MQVersion.cpp
new file mode 100755
index 0000000..015390c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQVersion.cpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MQVersion.h"
+
+namespace rocketmq {
+int MQVersion::s_CurrentVersion = MQVersion::V3_1_8;
+
+//<!************************************************************************
+const char* MQVersion::getVersionDesc(int value) {
+ switch (value) {
+ // case V1_0_0:
+ // return "V1_0_0";
+ }
+ return "";
+}
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MQVersion.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MQVersion.h b/rocketmq-cpp/src/common/MQVersion.h
new file mode 100755
index 0000000..aa9e9cd
--- /dev/null
+++ b/rocketmq-cpp/src/common/MQVersion.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQVERSION_H__
+#define __MQVERSION_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class MQVersion {
+ public:
+ enum Version {
+ V3_0_0_SNAPSHOT,
+ V3_0_0_ALPHA1,
+ V3_0_0_BETA1,
+ V3_0_0_BETA2,
+ V3_0_0_BETA3,
+ V3_0_0_BETA4,
+ V3_0_0_BETA5,
+ V3_0_0_BETA6_SNAPSHOT,
+ V3_0_0_BETA6,
+ V3_0_0_BETA7_SNAPSHOT,
+ V3_0_0_BETA7,
+ V3_0_0_BETA8_SNAPSHOT,
+ V3_0_0_BETA8,
+ V3_0_0_BETA9_SNAPSHOT,
+ V3_0_0_BETA9,
+ V3_0_0_FINAL,
+ V3_0_1_SNAPSHOT,
+ V3_0_1,
+ V3_0_2_SNAPSHOT,
+ V3_0_2,
+ V3_0_3_SNAPSHOT,
+ V3_0_3,
+ V3_0_4_SNAPSHOT,
+ V3_0_4,
+ V3_0_5_SNAPSHOT,
+ V3_0_5,
+ V3_0_6_SNAPSHOT,
+ V3_0_6,
+ V3_0_7_SNAPSHOT,
+ V3_0_7,
+ V3_0_8_SNAPSHOT,
+ V3_0_8,
+ V3_0_9_SNAPSHOT,
+ V3_0_9,
+
+ V3_0_10_SNAPSHOT,
+ V3_0_10,
+
+ V3_0_11_SNAPSHOT,
+ V3_0_11,
+
+ V3_0_12_SNAPSHOT,
+ V3_0_12,
+
+ V3_0_13_SNAPSHOT,
+ V3_0_13,
+
+ V3_0_14_SNAPSHOT,
+ V3_0_14,
+
+ V3_0_15_SNAPSHOT,
+ V3_0_15,
+
+ V3_1_0_SNAPSHOT,
+ V3_1_0,
+
+ V3_1_1_SNAPSHOT,
+ V3_1_1,
+
+ V3_1_2_SNAPSHOT,
+ V3_1_2,
+
+ V3_1_3_SNAPSHOT,
+ V3_1_3,
+
+ V3_1_4_SNAPSHOT,
+ V3_1_4,
+
+ V3_1_5_SNAPSHOT,
+ V3_1_5,
+
+ V3_1_6_SNAPSHOT,
+ V3_1_6,
+
+ V3_1_7_SNAPSHOT,
+ V3_1_7,
+
+ V3_1_8_SNAPSHOT,
+ V3_1_8,
+
+ V3_1_9_SNAPSHOT,
+ V3_1_9,
+
+ V3_2_0_SNAPSHOT,
+ V3_2_0,
+
+ V3_2_1_SNAPSHOT,
+ V3_2_1,
+
+ V3_2_2_SNAPSHOT,
+ V3_2_2,
+
+ V3_2_3_SNAPSHOT,
+ V3_2_3,
+
+ V3_2_4_SNAPSHOT,
+ V3_2_4,
+
+ V3_2_5_SNAPSHOT,
+ V3_2_5,
+
+ V3_2_6_SNAPSHOT,
+ V3_2_6,
+
+ V3_2_7_SNAPSHOT,
+ V3_2_7,
+
+ V3_2_8_SNAPSHOT,
+ V3_2_8,
+
+ V3_2_9_SNAPSHOT,
+ V3_2_9,
+
+ V3_3_1_SNAPSHOT,
+ V3_3_1,
+
+ V3_3_2_SNAPSHOT,
+ V3_3_2,
+
+ V3_3_3_SNAPSHOT,
+ V3_3_3,
+
+ V3_3_4_SNAPSHOT,
+ V3_3_4,
+
+ V3_3_5_SNAPSHOT,
+ V3_3_5,
+
+ V3_3_6_SNAPSHOT,
+ V3_3_6,
+
+ V3_3_7_SNAPSHOT,
+ V3_3_7,
+
+ V3_3_8_SNAPSHOT,
+ V3_3_8,
+
+ V3_3_9_SNAPSHOT,
+ V3_3_9,
+
+ V3_4_1_SNAPSHOT,
+ V3_4_1,
+
+ V3_4_2_SNAPSHOT,
+ V3_4_2,
+
+ V3_4_3_SNAPSHOT,
+ V3_4_3,
+
+ V3_4_4_SNAPSHOT,
+ V3_4_4,
+
+ V3_4_5_SNAPSHOT,
+ V3_4_5,
+
+ V3_4_6_SNAPSHOT,
+ V3_4_6,
+
+ V3_4_7_SNAPSHOT,
+ V3_4_7,
+
+ V3_4_8_SNAPSHOT,
+ V3_4_8,
+
+ V3_4_9_SNAPSHOT,
+ V3_4_9,
+ V3_5_1_SNAPSHOT,
+ V3_5_1,
+
+ V3_5_2_SNAPSHOT,
+ V3_5_2,
+
+ V3_5_3_SNAPSHOT,
+ V3_5_3,
+
+ V3_5_4_SNAPSHOT,
+ V3_5_4,
+
+ V3_5_5_SNAPSHOT,
+ V3_5_5,
+
+ V3_5_6_SNAPSHOT,
+ V3_5_6,
+
+ V3_5_7_SNAPSHOT,
+ V3_5_7,
+
+ V3_5_8_SNAPSHOT,
+ V3_5_8,
+
+ V3_5_9_SNAPSHOT,
+ V3_5_9,
+ };
+
+ static const char* getVersionDesc(int value);
+
+ public:
+ static int s_CurrentVersion;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryInputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryInputStream.cpp b/rocketmq-cpp/src/common/MemoryInputStream.cpp
new file mode 100644
index 0000000..bfbd772
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryInputStream.cpp
@@ -0,0 +1,55 @@
+#include "MemoryInputStream.h"
+
+namespace rocketmq {
+MemoryInputStream::MemoryInputStream(const void* const sourceData,
+ const size_t sourceDataSize,
+ const bool keepInternalCopy)
+ : data(sourceData),
+ dataSize(sourceDataSize),
+ position(0),
+ internalCopy(NULL) {
+ if (keepInternalCopy) createInternalCopy();
+}
+
+MemoryInputStream::MemoryInputStream(const MemoryBlock& sourceData,
+ const bool keepInternalCopy)
+ : data(sourceData.getData()),
+ dataSize(sourceData.getSize()),
+ position(0),
+ internalCopy(NULL) {
+ if (keepInternalCopy) createInternalCopy();
+}
+
+void MemoryInputStream::createInternalCopy() {
+ std::free(internalCopy);
+ internalCopy = static_cast<char*>(std::malloc(dataSize));
+ memcpy(internalCopy, data, dataSize);
+ data = internalCopy;
+}
+
+MemoryInputStream::~MemoryInputStream() { std::free(internalCopy); }
+
+int64 MemoryInputStream::getTotalLength() { return (int64)dataSize; }
+
+int MemoryInputStream::read(void* const buffer, const int howMany) {
+ const int num = std::min(howMany, (int)(dataSize - position));
+ if (num <= 0) return 0;
+
+ memcpy(buffer, data + position, (size_t)num);
+ position += (unsigned int)num;
+ return num;
+}
+
+bool MemoryInputStream::isExhausted() { return position >= dataSize; }
+
+bool MemoryInputStream::setPosition(const int64 pos) {
+ if (pos < 0)
+ position = 0;
+ else
+ position = (int64)dataSize < pos ? (int64)dataSize : pos;
+
+ return true;
+}
+
+int64 MemoryInputStream::getPosition() { return (int64)position; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryInputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryInputStream.h b/rocketmq-cpp/src/common/MemoryInputStream.h
new file mode 100644
index 0000000..a25f079
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryInputStream.h
@@ -0,0 +1,80 @@
+#ifndef MEMORYINPUTSTREAM_H_INCLUDED
+#define MEMORYINPUTSTREAM_H_INCLUDED
+
+#include "InputStream.h"
+
+namespace rocketmq {
+//==============================================================================
+/**
+ Allows a block of data to be accessed as a stream.
+
+ This can either be used to refer to a shared block of memory, or can make
+ its
+ own internal copy of the data when the MemoryInputStream is created.
+*/
+class ROCKETMQCLIENT_API MemoryInputStream : public InputStream {
+ public:
+ //==============================================================================
+ /** Creates a MemoryInputStream.
+
+ @param sourceData the block of data to use as the stream's
+ source
+ @param sourceDataSize the number of bytes in the source data
+ block
+ @param keepInternalCopyOfData if false, the stream will just keep a
+ pointer to
+ the source data, so this data shouldn't be
+ changed
+ for the lifetime of the stream; if this
+ parameter is
+ true, the stream will make its own copy of
+ the
+ data and use that.
+ */
+ MemoryInputStream(const void* sourceData, size_t sourceDataSize,
+ bool keepInternalCopyOfData);
+
+ /** Creates a MemoryInputStream.
+
+ @param data a block of data to use as the stream's
+ source
+ @param keepInternalCopyOfData if false, the stream will just keep a
+ reference to
+ the source data, so this data shouldn't be
+ changed
+ for the lifetime of the stream; if this
+ parameter is
+ true, the stream will make its own copy of
+ the
+ data and use that.
+ */
+ MemoryInputStream(const MemoryBlock& data, bool keepInternalCopyOfData);
+
+ /** Destructor. */
+ ~MemoryInputStream();
+
+ /** Returns a pointer to the source data block from which this stream is
+ * reading. */
+ const void* getData() const { return data; }
+
+ /** Returns the number of bytes of source data in the block from which this
+ * stream is reading. */
+ size_t getDataSize() const { return dataSize; }
+
+ //==============================================================================
+ int64 getPosition();
+ bool setPosition(int64 pos);
+ int64 getTotalLength();
+ bool isExhausted();
+ int read(void* destBuffer, int maxBytesToRead);
+
+ private:
+ //==============================================================================
+ const void* data;
+ size_t dataSize, position;
+ char* internalCopy;
+
+ void createInternalCopy();
+};
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryOutputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryOutputStream.cpp b/rocketmq-cpp/src/common/MemoryOutputStream.cpp
new file mode 100644
index 0000000..36d9f8c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryOutputStream.cpp
@@ -0,0 +1,148 @@
+#include "MemoryOutputStream.h"
+
+namespace rocketmq {
+MemoryOutputStream::MemoryOutputStream(const size_t initialSize)
+ : blockToUse(&internalBlock),
+ externalData(NULL),
+ position(0),
+ size(0),
+ availableSize(0) {
+ internalBlock.setSize(initialSize, false);
+}
+
+MemoryOutputStream::MemoryOutputStream(MemoryBlock& memoryBlockToWriteTo,
+ const bool appendToExistingBlockContent)
+ : blockToUse(&memoryBlockToWriteTo),
+ externalData(NULL),
+ position(0),
+ size(0),
+ availableSize(0) {
+ if (appendToExistingBlockContent)
+ position = size = memoryBlockToWriteTo.getSize();
+}
+
+MemoryOutputStream::MemoryOutputStream(void* destBuffer, size_t destBufferSize)
+ : blockToUse(NULL),
+ externalData(destBuffer),
+ position(0),
+ size(0),
+ availableSize(destBufferSize) {}
+
+MemoryOutputStream::~MemoryOutputStream() { trimExternalBlockSize(); }
+
+void MemoryOutputStream::flush() { trimExternalBlockSize(); }
+
+void MemoryOutputStream::trimExternalBlockSize() {
+ if (blockToUse != &internalBlock && blockToUse != NULL)
+ blockToUse->setSize(size, false);
+}
+
+void MemoryOutputStream::preallocate(const size_t bytesToPreallocate) {
+ if (blockToUse != NULL) blockToUse->ensureSize(bytesToPreallocate + 1);
+}
+
+void MemoryOutputStream::reset() {
+ position = 0;
+ size = 0;
+}
+
+char* MemoryOutputStream::prepareToWrite(size_t numBytes) {
+ size_t storageNeeded = position + numBytes;
+
+ char* data;
+
+ if (blockToUse != NULL) {
+ if (storageNeeded >= (unsigned int)(blockToUse->getSize()))
+ blockToUse->ensureSize(
+ (storageNeeded + std::min(storageNeeded / 2, (size_t)(1024 * 1024)) +
+ 32) &
+ ~31u);
+
+ data = static_cast<char*>(blockToUse->getData());
+ } else {
+ if (storageNeeded > availableSize) return NULL;
+
+ data = static_cast<char*>(externalData);
+ }
+
+ char* const writePointer = data + position;
+ position += numBytes;
+ size = std::max(size, position);
+ return writePointer;
+}
+
+bool MemoryOutputStream::write(const void* const buffer, size_t howMany) {
+ if (howMany == 0) return true;
+
+ if (char* dest = prepareToWrite(howMany)) {
+ memcpy(dest, buffer, howMany);
+ return true;
+ }
+
+ return false;
+}
+
+bool MemoryOutputStream::writeRepeatedByte(uint8 byte, size_t howMany) {
+ if (howMany == 0) return true;
+
+ if (char* dest = prepareToWrite(howMany)) {
+ memset(dest, byte, howMany);
+ return true;
+ }
+
+ return false;
+}
+
+MemoryBlock MemoryOutputStream::getMemoryBlock() const {
+ return MemoryBlock(getData(), getDataSize());
+}
+
+const void* MemoryOutputStream::getData() const {
+ if (blockToUse == NULL) return externalData;
+
+ if ((unsigned int)blockToUse->getSize() > size)
+ static_cast<char*>(blockToUse->getData())[size] = 0;
+
+ return blockToUse->getData();
+}
+
+bool MemoryOutputStream::setPosition(int64 newPosition) {
+ if (newPosition <= (int64)size) {
+ // ok to seek backwards
+ if (newPosition < 0)
+ position = 0;
+ else
+ position = (int64)size < newPosition ? size : newPosition;
+ return true;
+ }
+
+ // can't move beyond the end of the stream..
+ return false;
+}
+
+int64 MemoryOutputStream::writeFromInputStream(InputStream& source,
+ int64 maxNumBytesToWrite) {
+ // before writing from an input, see if we can preallocate to make it more
+ // efficient..
+ int64 availableData = source.getTotalLength() - source.getPosition();
+
+ if (availableData > 0) {
+ if (maxNumBytesToWrite > availableData || maxNumBytesToWrite < 0)
+ maxNumBytesToWrite = availableData;
+
+ if (blockToUse != NULL)
+ preallocate(blockToUse->getSize() + (size_t)maxNumBytesToWrite);
+ }
+
+ return OutputStream::writeFromInputStream(source, maxNumBytesToWrite);
+}
+
+OutputStream& operator<<(OutputStream& stream,
+ const MemoryOutputStream& streamToRead) {
+ const size_t dataSize = streamToRead.getDataSize();
+
+ if (dataSize > 0) stream.write(streamToRead.getData(), dataSize);
+
+ return stream;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MemoryOutputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MemoryOutputStream.h b/rocketmq-cpp/src/common/MemoryOutputStream.h
new file mode 100644
index 0000000..53fe66c
--- /dev/null
+++ b/rocketmq-cpp/src/common/MemoryOutputStream.h
@@ -0,0 +1,115 @@
+#ifndef MEMORYOUTPUTSTREAM_H_INCLUDED
+#define MEMORYOUTPUTSTREAM_H_INCLUDED
+
+#include "OutputStream.h"
+
+namespace rocketmq {
+//==============================================================================
+/**
+ Writes data to an internal memory buffer, which grows as required.
+
+ The data that was written into the stream can then be accessed later as
+ a contiguous block of memory.
+*/
+class ROCKETMQCLIENT_API MemoryOutputStream : public OutputStream {
+ public:
+ //==============================================================================
+ /** Creates an empty memory stream, ready to be written into.
+ @param initialSize the intial amount of capacity to allocate for writing
+ into
+ */
+ MemoryOutputStream(size_t initialSize = 256);
+
+ /** Creates a memory stream for writing into into a pre-existing MemoryBlock
+ object.
+
+ Note that the destination block will always be larger than the amount of
+ data
+ that has been written to the stream, because the MemoryOutputStream keeps
+ some
+ spare capactity at its end. To trim the block's size down to fit the
+ actual
+ data, call flush(), or delete the MemoryOutputStream.
+
+ @param memoryBlockToWriteTo the block into which new data will
+ be written.
+ @param appendToExistingBlockContent if this is true, the contents of
+ the block will be
+ kept, and new data will be
+ appended to it. If false,
+ the block will be cleared before
+ use
+ */
+ MemoryOutputStream(MemoryBlock& memoryBlockToWriteTo,
+ bool appendToExistingBlockContent);
+
+ /** Creates a MemoryOutputStream that will write into a user-supplied,
+ fixed-size
+ block of memory.
+ When using this mode, the stream will write directly into this memory area
+ until
+ it's full, at which point write operations will fail.
+ */
+ MemoryOutputStream(void* destBuffer, size_t destBufferSize);
+
+ /** Destructor.
+ This will free any data that was written to it.
+ */
+ ~MemoryOutputStream();
+
+ //==============================================================================
+ /** Returns a pointer to the data that has been written to the stream.
+ @see getDataSize
+ */
+ const void* getData() const;
+
+ /** Returns the number of bytes of data that have been written to the stream.
+ @see getData
+ */
+ size_t getDataSize() const { return size; }
+
+ /** Resets the stream, clearing any data that has been written to it so far.
+ */
+ void reset();
+
+ /** Increases the internal storage capacity to be able to contain at least the
+ specified
+ amount of data without needing to be resized.
+ */
+ void preallocate(size_t bytesToPreallocate);
+
+ /** Returns a copy of the stream's data as a memory block. */
+ MemoryBlock getMemoryBlock() const;
+
+ //==============================================================================
+ /** If the stream is writing to a user-supplied MemoryBlock, this will trim
+ any excess
+ capacity off the block, so that its length matches the amount of actual
+ data that
+ has been written so far.
+ */
+ void flush();
+
+ bool write(const void*, size_t);
+ int64 getPosition() { return (int64)position; }
+ bool setPosition(int64);
+ int64 writeFromInputStream(InputStream&, int64 maxNumBytesToWrite);
+ bool writeRepeatedByte(uint8 byte, size_t numTimesToRepeat);
+
+ private:
+ //==============================================================================
+ MemoryBlock* const blockToUse;
+ MemoryBlock internalBlock;
+ void* externalData;
+ size_t position, size, availableSize;
+
+ void trimExternalBlockSize();
+ char* prepareToWrite(size_t);
+};
+
+/** Copies all the data that has been written to a MemoryOutputStream into
+ * another stream. */
+OutputStream& operator<<(OutputStream& stream,
+ const MemoryOutputStream& streamToRead);
+}
+#endif // MEMORYOUTPUTSTREAM_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MessageSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MessageSysFlag.cpp b/rocketmq-cpp/src/common/MessageSysFlag.cpp
new file mode 100755
index 0000000..50a28b4
--- /dev/null
+++ b/rocketmq-cpp/src/common/MessageSysFlag.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MessageSysFlag.h"
+
+namespace rocketmq {
+int MessageSysFlag::CompressedFlag = (0x1 << 0);
+int MessageSysFlag::MultiTagsFlag = (0x1 << 1);
+
+int MessageSysFlag::TransactionNotType = (0x0 << 2);
+int MessageSysFlag::TransactionPreparedType = (0x1 << 2);
+int MessageSysFlag::TransactionCommitType = (0x2 << 2);
+int MessageSysFlag::TransactionRollbackType = (0x3 << 2);
+
+int MessageSysFlag::getTransactionValue(int flag) {
+ return flag & TransactionRollbackType;
+}
+
+int MessageSysFlag::resetTransactionValue(int flag, int type) {
+ return (flag & (~TransactionRollbackType)) | type;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/MessageSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/MessageSysFlag.h b/rocketmq-cpp/src/common/MessageSysFlag.h
new file mode 100755
index 0000000..d7f7993
--- /dev/null
+++ b/rocketmq-cpp/src/common/MessageSysFlag.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MESSAGESYSFLAG_H__
+#define __MESSAGESYSFLAG_H__
+
+namespace rocketmq {
+//<!************************************************************************
+class MessageSysFlag {
+ public:
+ static int getTransactionValue(int flag);
+ static int resetTransactionValue(int flag, int type);
+
+ public:
+ static int CompressedFlag;
+ static int MultiTagsFlag;
+ static int TransactionNotType;
+ static int TransactionPreparedType;
+ static int TransactionCommitType;
+ static int TransactionRollbackType;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/NamesrvConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/NamesrvConfig.h b/rocketmq-cpp/src/common/NamesrvConfig.h
new file mode 100755
index 0000000..6c0259f
--- /dev/null
+++ b/rocketmq-cpp/src/common/NamesrvConfig.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __NAMESRVCONFIG_H__
+#define __NAMESRVCONFIG_H__
+
+#include <stdlib.h>
+#include <string>
+namespace rocketmq {
+//<!***************************************************************************
+class NamesrvConfig {
+ public:
+ NamesrvConfig() {
+ m_kvConfigPath = "";
+
+ char* home = getenv(ROCKETMQ_HOME_ENV.c_str());
+ if (home) {
+ m_rocketmqHome = home;
+ } else {
+ m_rocketmqHome = "";
+ }
+ }
+
+ const string& getRocketmqHome() const { return m_rocketmqHome; }
+
+ void setRocketmqHome(const string& rocketmqHome) {
+ m_rocketmqHome = rocketmqHome;
+ }
+
+ const string& getKvConfigPath() const { return m_kvConfigPath; }
+
+ void setKvConfigPath(const string& kvConfigPath) {
+ m_kvConfigPath = kvConfigPath;
+ }
+
+ private:
+ string m_rocketmqHome;
+ string m_kvConfigPath;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/OutputStream.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/OutputStream.cpp b/rocketmq-cpp/src/common/OutputStream.cpp
new file mode 100644
index 0000000..59d8c0f
--- /dev/null
+++ b/rocketmq-cpp/src/common/OutputStream.cpp
@@ -0,0 +1,129 @@
+#include "OutputStream.h"
+#include <limits>
+
+namespace rocketmq {
+//==============================================================================
+OutputStream::OutputStream() {}
+
+OutputStream::~OutputStream() {}
+
+//==============================================================================
+bool OutputStream::writeBool(const bool b) {
+ return writeByte(b ? (char)1 : (char)0);
+}
+
+bool OutputStream::writeByte(char byte) { return write(&byte, 1); }
+
+bool OutputStream::writeRepeatedByte(uint8 byte, size_t numTimesToRepeat) {
+ for (size_t i = 0; i < numTimesToRepeat; ++i)
+ if (!writeByte((char)byte)) return false;
+
+ return true;
+}
+
+bool OutputStream::writeShort(short value) {
+ const unsigned short v = ByteOrder::swapIfBigEndian((unsigned short)value);
+ return write(&v, 2);
+}
+
+bool OutputStream::writeShortBigEndian(short value) {
+ const unsigned short v = ByteOrder::swapIfLittleEndian((unsigned short)value);
+ return write(&v, 2);
+}
+
+bool OutputStream::writeInt(int value) {
+ const unsigned int v = ByteOrder::swapIfBigEndian((unsigned int)value);
+ return write(&v, 4);
+}
+
+bool OutputStream::writeIntBigEndian(int value) {
+ const unsigned int v = ByteOrder::swapIfLittleEndian((unsigned int)value);
+ return write(&v, 4);
+}
+
+bool OutputStream::writeCompressedInt(int value) {
+ unsigned int un = (value < 0) ? (unsigned int)-value : (unsigned int)value;
+
+ uint8 data[5];
+ int num = 0;
+
+ while (un > 0) {
+ data[++num] = (uint8)un;
+ un >>= 8;
+ }
+
+ data[0] = (uint8)num;
+
+ if (value < 0) data[0] |= 0x80;
+
+ return write(data, (size_t)num + 1);
+}
+
+bool OutputStream::writeInt64(int64 value) {
+ const uint64 v = ByteOrder::swapIfBigEndian((uint64)value);
+ return write(&v, 8);
+}
+
+bool OutputStream::writeInt64BigEndian(int64 value) {
+ const uint64 v = ByteOrder::swapIfLittleEndian((uint64)value);
+ return write(&v, 8);
+}
+
+bool OutputStream::writeFloat(float value) {
+ union {
+ int asInt;
+ float asFloat;
+ } n;
+ n.asFloat = value;
+ return writeInt(n.asInt);
+}
+
+bool OutputStream::writeFloatBigEndian(float value) {
+ union {
+ int asInt;
+ float asFloat;
+ } n;
+ n.asFloat = value;
+ return writeIntBigEndian(n.asInt);
+}
+
+bool OutputStream::writeDouble(double value) {
+ union {
+ int64 asInt;
+ double asDouble;
+ } n;
+ n.asDouble = value;
+ return writeInt64(n.asInt);
+}
+
+bool OutputStream::writeDoubleBigEndian(double value) {
+ union {
+ int64 asInt;
+ double asDouble;
+ } n;
+ n.asDouble = value;
+ return writeInt64BigEndian(n.asInt);
+}
+
+int64 OutputStream::writeFromInputStream(InputStream& source,
+ int64 numBytesToWrite) {
+ if (numBytesToWrite < 0) numBytesToWrite = std::numeric_limits<int64>::max();
+
+ int64 numWritten = 0;
+
+ while (numBytesToWrite > 0) {
+ char buffer[8192];
+ const int num = source.read(
+ buffer, (int)std::min(numBytesToWrite, (int64)sizeof(buffer)));
+
+ if (num <= 0) break;
+
+ write(buffer, (size_t)num);
+
+ numBytesToWrite -= num;
+ numWritten += num;
+ }
+
+ return numWritten;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/OutputStream.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/OutputStream.h b/rocketmq-cpp/src/common/OutputStream.h
new file mode 100644
index 0000000..1fb2124
--- /dev/null
+++ b/rocketmq-cpp/src/common/OutputStream.h
@@ -0,0 +1,184 @@
+#ifndef OUTPUTSTREAM_H_INCLUDED
+#define OUTPUTSTREAM_H_INCLUDED
+
+#include "ByteOrder.h"
+#include "InputStream.h"
+namespace rocketmq {
+//==============================================================================
+/**
+ The base class for streams that write data to some kind of destination.
+
+ Input and output streams are used throughout the library - subclasses can
+ override
+ some or all of the virtual functions to implement their behaviour.
+
+ @see InputStream, MemoryOutputStream, FileOutputStream
+*/
+class ROCKETMQCLIENT_API OutputStream {
+ protected:
+ //==============================================================================
+ OutputStream();
+
+ public:
+ /** Destructor.
+
+ Some subclasses might want to do things like call flush() during their
+ destructors.
+ */
+ virtual ~OutputStream();
+
+ //==============================================================================
+ /** If the stream is using a buffer, this will ensure it gets written
+ out to the destination. */
+ virtual void flush() = 0;
+
+ /** Tries to move the stream's output position.
+
+ Not all streams will be able to seek to a new position - this will return
+ false if it fails to work.
+
+ @see getPosition
+ */
+ virtual bool setPosition(int64 newPosition) = 0;
+
+ /** Returns the stream's current position.
+
+ @see setPosition
+ */
+ virtual int64 getPosition() = 0;
+
+ //==============================================================================
+ /** Writes a block of data to the stream.
+
+ When creating a subclass of OutputStream, this is the only write method
+ that needs to be overloaded - the base class has methods for writing other
+ types of data which use this to do the work.
+
+ @param dataToWrite the target buffer to receive the data. This must
+ not be null.
+ @param numberOfBytes the number of bytes to write.
+ @returns false if the write operation fails for some reason
+ */
+ virtual bool write(const void* dataToWrite, size_t numberOfBytes) = 0;
+
+ //==============================================================================
+ /** Writes a single byte to the stream.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readByte
+ */
+ virtual bool writeByte(char byte);
+
+ /** Writes a boolean to the stream as a single byte.
+ This is encoded as a binary byte (not as text) with a value of 1 or 0.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readBool
+ */
+ virtual bool writeBool(bool boolValue);
+
+ /** Writes a 16-bit integer to the stream in a little-endian byte order.
+ This will write two bytes to the stream: (value & 0xff), then (value >>
+ 8).
+ @returns false if the write operation fails for some reason
+ @see InputStream::readShort
+ */
+ virtual bool writeShort(short value);
+
+ /** Writes a 16-bit integer to the stream in a big-endian byte order.
+ This will write two bytes to the stream: (value >> 8), then (value &
+ 0xff).
+ @returns false if the write operation fails for some reason
+ @see InputStream::readShortBigEndian
+ */
+ virtual bool writeShortBigEndian(short value);
+
+ /** Writes a 32-bit integer to the stream in a little-endian byte order.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readInt
+ */
+ virtual bool writeInt(int value);
+
+ /** Writes a 32-bit integer to the stream in a big-endian byte order.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readIntBigEndian
+ */
+ virtual bool writeIntBigEndian(int value);
+
+ /** Writes a 64-bit integer to the stream in a little-endian byte order.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readInt64
+ */
+ virtual bool writeInt64(int64 value);
+
+ /** Writes a 64-bit integer to the stream in a big-endian byte order.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readInt64BigEndian
+ */
+ virtual bool writeInt64BigEndian(int64 value);
+
+ /** Writes a 32-bit floating point value to the stream in a binary format.
+ The binary 32-bit encoding of the float is written as a little-endian int.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readFloat
+ */
+ virtual bool writeFloat(float value);
+
+ /** Writes a 32-bit floating point value to the stream in a binary format.
+ The binary 32-bit encoding of the float is written as a big-endian int.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readFloatBigEndian
+ */
+ virtual bool writeFloatBigEndian(float value);
+
+ /** Writes a 64-bit floating point value to the stream in a binary format.
+ The eight raw bytes of the double value are written out as a little-endian
+ 64-bit int.
+ @returns false if the write operation fails for some reason
+ @see InputStream::readDouble
+ */
+ virtual bool writeDouble(double value);
+
+ /** Writes a 64-bit floating point value to the stream in a binary format.
+ The eight raw bytes of the double value are written out as a big-endian
+ 64-bit int.
+ @see InputStream::readDoubleBigEndian
+ @returns false if the write operation fails for some reason
+ */
+ virtual bool writeDoubleBigEndian(double value);
+
+ /** Writes a byte to the output stream a given number of times.
+ @returns false if the write operation fails for some reason
+ */
+ virtual bool writeRepeatedByte(uint8 byte, size_t numTimesToRepeat);
+
+ /** Writes a condensed binary encoding of a 32-bit integer.
+
+ If you're storing a lot of integers which are unlikely to have very large
+ values,
+ this can save a lot of space, because values under 0xff will only take up
+ 2 bytes,
+ under 0xffff only 3 bytes, etc.
+
+ The format used is: number of significant bytes + up to 4 bytes in
+ little-endian order.
+
+ @returns false if the write operation fails for some reason
+ @see InputStream::readCompressedInt
+ */
+ virtual bool writeCompressedInt(int value);
+
+ /** Reads data from an input stream and writes it to this stream.
+
+ @param source the stream to read from
+ @param maxNumBytesToWrite the number of bytes to read from the stream
+ (if this is
+ less than zero, it will keep reading until the
+ input
+ is exhausted)
+ @returns the number of bytes written
+ */
+ virtual int64 writeFromInputStream(InputStream& source,
+ int64 maxNumBytesToWrite);
+};
+}
+
+#endif // OUTPUTSTREAM_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PermName.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PermName.cpp b/rocketmq-cpp/src/common/PermName.cpp
new file mode 100755
index 0000000..7f168a4
--- /dev/null
+++ b/rocketmq-cpp/src/common/PermName.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PermName.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+int PermName::PERM_PRIORITY = 0x1 << 3;
+int PermName::PERM_READ = 0x1 << 2;
+int PermName::PERM_WRITE = 0x1 << 1;
+int PermName::PERM_INHERIT = 0x1 << 0;
+
+bool PermName::isReadable(int perm) { return (perm & PERM_READ) == PERM_READ; }
+
+bool PermName::isWriteable(int perm) {
+ return (perm & PERM_WRITE) == PERM_WRITE;
+}
+
+bool PermName::isInherited(int perm) {
+ return (perm & PERM_INHERIT) == PERM_INHERIT;
+}
+
+string PermName::perm2String(int perm) {
+ string pm("---");
+ if (isReadable(perm)) {
+ pm.replace(0, 1, "R");
+ }
+
+ if (isWriteable(perm)) {
+ pm.replace(1, 2, "W");
+ }
+
+ if (isInherited(perm)) {
+ pm.replace(2, 3, "X");
+ }
+
+ return pm;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PermName.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PermName.h b/rocketmq-cpp/src/common/PermName.h
new file mode 100755
index 0000000..6556382
--- /dev/null
+++ b/rocketmq-cpp/src/common/PermName.h
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PERMNAME_H__
+#define __PERMNAME_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class PermName {
+ public:
+ static int PERM_PRIORITY;
+ static int PERM_READ;
+ static int PERM_WRITE;
+ static int PERM_INHERIT;
+
+ static bool isReadable(int perm);
+ static bool isWriteable(int perm);
+ static bool isInherited(int perm);
+ static std::string perm2String(int perm);
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PullSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PullSysFlag.cpp b/rocketmq-cpp/src/common/PullSysFlag.cpp
new file mode 100755
index 0000000..6d68457
--- /dev/null
+++ b/rocketmq-cpp/src/common/PullSysFlag.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullSysFlag.h"
+
+namespace rocketmq {
+//<!************************************************************************
+int PullSysFlag::FLAG_COMMIT_OFFSET = 0x1 << 0;
+int PullSysFlag::FLAG_SUSPEND = 0x1 << 1;
+int PullSysFlag::FLAG_SUBSCRIPTION = 0x1 << 2;
+int PullSysFlag::FLAG_CLASS_FILTER = 0x1 << 3;
+
+int PullSysFlag::buildSysFlag(bool commitOffset, bool suspend,
+ bool subscription, bool classFilter) {
+ int flag = 0;
+
+ if (commitOffset) {
+ flag |= FLAG_COMMIT_OFFSET;
+ }
+
+ if (suspend) {
+ flag |= FLAG_SUSPEND;
+ }
+
+ if (subscription) {
+ flag |= FLAG_SUBSCRIPTION;
+ }
+
+ if (classFilter) {
+ flag |= FLAG_CLASS_FILTER;
+ }
+
+ return flag;
+}
+
+int PullSysFlag::clearCommitOffsetFlag(int sysFlag) {
+ return sysFlag & (~FLAG_COMMIT_OFFSET);
+}
+
+bool PullSysFlag::hasCommitOffsetFlag(int sysFlag) {
+ return (sysFlag & FLAG_COMMIT_OFFSET) == FLAG_COMMIT_OFFSET;
+}
+
+bool PullSysFlag::hasSuspendFlag(int sysFlag) {
+ return (sysFlag & FLAG_SUSPEND) == FLAG_SUSPEND;
+}
+
+bool PullSysFlag::hasSubscriptionFlag(int sysFlag) {
+ return (sysFlag & FLAG_SUBSCRIPTION) == FLAG_SUBSCRIPTION;
+}
+
+bool PullSysFlag::hasClassFilterFlag(int sysFlag) {
+ return (sysFlag & FLAG_CLASS_FILTER) == FLAG_CLASS_FILTER;
+}
+
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/PullSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/PullSysFlag.h b/rocketmq-cpp/src/common/PullSysFlag.h
new file mode 100755
index 0000000..c809772
--- /dev/null
+++ b/rocketmq-cpp/src/common/PullSysFlag.h
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PULLSYSFLAG_H__
+#define __PULLSYSFLAG_H__
+namespace rocketmq {
+//<!************************************************************************
+class PullSysFlag {
+ public:
+ static int buildSysFlag(bool commitOffset, bool suspend, bool subscription,
+ bool classFilter);
+
+ static int clearCommitOffsetFlag(int sysFlag);
+ static bool hasCommitOffsetFlag(int sysFlag);
+ static bool hasSuspendFlag(int sysFlag);
+ static bool hasSubscriptionFlag(int sysFlag);
+ static bool hasClassFilterFlag(int sysFlag);
+
+ private:
+ static int FLAG_COMMIT_OFFSET;
+ static int FLAG_SUSPEND;
+ static int FLAG_SUBSCRIPTION;
+ static int FLAG_CLASS_FILTER;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/ServiceState.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/ServiceState.h b/rocketmq-cpp/src/common/ServiceState.h
new file mode 100755
index 0000000..a8ae792
--- /dev/null
+++ b/rocketmq-cpp/src/common/ServiceState.h
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SERVICESTATE_H__
+#define __SERVICESTATE_H__
+namespace rocketmq {
+//<!***************************************************************************
+enum ServiceState { CREATE_JUST, RUNNING, SHUTDOWN_ALREADY, START_FAILED };
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/SubscriptionGroupConfig.h b/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
new file mode 100755
index 0000000..9a54140
--- /dev/null
+++ b/rocketmq-cpp/src/common/SubscriptionGroupConfig.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SUBSCRIPTIONGROUPCONFIG_H__
+#define __SUBSCRIPTIONGROUPCONFIG_H__
+
+#include <string>
+
+namespace rocketmq {
+//<!***************************************************************************
+class SubscriptionGroupConfig {
+ public:
+ SubscriptionGroupConfig(const string& groupName) {
+ this->groupName = groupName;
+ consumeEnable = true;
+ consumeFromMinEnable = true;
+ consumeBroadcastEnable = true;
+ retryQueueNums = 1;
+ retryMaxTimes = 5;
+ brokerId = MASTER_ID;
+ whichBrokerWhenConsumeSlowly = 1;
+ }
+
+ string groupName;
+ bool consumeEnable;
+ bool consumeFromMinEnable;
+ bool consumeBroadcastEnable;
+ int retryQueueNums;
+ int retryMaxTimes;
+ int brokerId;
+ int whichBrokerWhenConsumeSlowly;
+};
+
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopAddressing.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopAddressing.cpp b/rocketmq-cpp/src/common/TopAddressing.cpp
new file mode 100644
index 0000000..3a1742d
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopAddressing.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TopAddressing.h"
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <vector>
+#include "UtilAll.h"
+#include "sync_http_client.h"
+#include "url.h"
+
+namespace rocketmq {
+TopAddressing::TopAddressing(string unitName) : m_unitName(unitName) {}
+
+TopAddressing::~TopAddressing() {}
+
+int TopAddressing::IsIPAddr(const char* sValue) {
+ if (NULL == sValue) return -1;
+
+ while (*sValue != '\0') {
+ if ((*sValue < '0' || *sValue > '9') && (*sValue != '.')) return -1;
+ sValue++;
+ }
+ return 0;
+}
+
+void TopAddressing::updateNameServerAddressList(const string& adds) {
+ boost::lock_guard<boost::mutex> lock(m_addrLock);
+ vector<string> out;
+ UtilAll::Split(out, adds, ";");
+ if (out.size() > 0) m_addrs.clear();
+ for (size_t i = 0; i < out.size(); i++) {
+ string addr = out[i];
+ UtilAll::Trim(addr);
+
+ list<string>::iterator findit = find(m_addrs.begin(), m_addrs.end(), addr);
+ if (findit == m_addrs.end()) {
+ string hostName;
+ short portNumber;
+ if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+ LOG_INFO("updateNameServerAddressList:%s", addr.c_str());
+ m_addrs.push_back(addr);
+ }
+ }
+ }
+}
+
+string TopAddressing::fetchNSAddr(const string& NSDomain) {
+ LOG_DEBUG("fetchNSAddr begin");
+ string nsAddr = NSDomain.empty() ? WS_ADDR : NSDomain;
+ if (!m_unitName.empty()) {
+ nsAddr = nsAddr + "-" + m_unitName + "?nofix=1";
+ LOG_INFO("NSAddr is:%s", nsAddr.c_str());
+ }
+
+ std::string tmp_nameservers;
+ std::string nameservers;
+ Url url_s(nsAddr);
+ LOG_INFO("fetchNSAddr protocol: %s, port: %s, host:%s, path:%s, ",
+ url_s.protocol_.c_str(), url_s.port_.c_str(), url_s.host_.c_str(),
+ url_s.path_.c_str());
+
+ bool ret = SyncfetchNsAddr(url_s, tmp_nameservers);
+ if (ret) {
+ nameservers = clearNewLine(tmp_nameservers);
+ if (nameservers.empty()) {
+ LOG_ERROR("fetchNSAddr with domain is empty");
+ } else {
+ updateNameServerAddressList(nameservers);
+ }
+ } else {
+ LOG_ERROR(
+ "fetchNSAddr with domain failed, connect failure or wrong response");
+ }
+
+ return nameservers;
+}
+
+string TopAddressing::clearNewLine(const string& str) {
+ string newString = str;
+ size_t index = newString.find("\r");
+ if (index != string::npos) {
+ return newString.substr(0, index);
+ }
+
+ index = newString.find("\n");
+ if (index != string::npos) {
+ return newString.substr(0, index);
+ }
+
+ return newString;
+}
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopAddressing.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopAddressing.h b/rocketmq-cpp/src/common/TopAddressing.h
new file mode 100755
index 0000000..a850023
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopAddressing.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPADDRESSING_H__
+#define __TOPADDRESSING_H__
+
+#include <sys/time.h>
+#include <boost/thread/thread.hpp>
+#include <list>
+#include <map>
+#include <string>
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+class TopAddressing {
+ public:
+ TopAddressing(string unitName);
+ virtual ~TopAddressing();
+
+ public:
+ string fetchNSAddr(const string& NSDomain);
+
+ private:
+ string clearNewLine(const string& str);
+ void updateNameServerAddressList(const string& adds);
+ int IsIPAddr(const char* sValue);
+
+ private:
+ boost::mutex m_addrLock;
+ list<string> m_addrs;
+ string m_unitName;
+};
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicConfig.cpp b/rocketmq-cpp/src/common/TopicConfig.cpp
new file mode 100755
index 0000000..e0e1b4d
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicConfig.cpp
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TopicConfig.h"
+#include <stdlib.h>
+#include <sstream>
+#include "PermName.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+int TopicConfig::DefaultReadQueueNums = 16;
+int TopicConfig::DefaultWriteQueueNums = 16;
+string TopicConfig::SEPARATOR = " ";
+
+TopicConfig::TopicConfig()
+ : m_topicName(""),
+ m_readQueueNums(DefaultReadQueueNums),
+ m_writeQueueNums(DefaultWriteQueueNums),
+ m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+ m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::TopicConfig(const string& topicName)
+ : m_topicName(topicName),
+ m_readQueueNums(DefaultReadQueueNums),
+ m_writeQueueNums(DefaultWriteQueueNums),
+ m_perm(PermName::PERM_READ | PermName::PERM_WRITE),
+ m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::TopicConfig(const string& topicName, int readQueueNums,
+ int writeQueueNums, int perm)
+ : m_topicName(topicName),
+ m_readQueueNums(readQueueNums),
+ m_writeQueueNums(writeQueueNums),
+ m_perm(perm),
+ m_topicFilterType(SINGLE_TAG) {}
+
+TopicConfig::~TopicConfig() {}
+
+string TopicConfig::encode() {
+ stringstream ss;
+ ss << m_topicName << SEPARATOR << m_readQueueNums << SEPARATOR
+ << m_writeQueueNums << SEPARATOR << m_perm << SEPARATOR
+ << m_topicFilterType;
+
+ return ss.str();
+}
+
+bool TopicConfig::decode(const string& in) {
+ stringstream ss(in);
+
+ ss >> m_topicName;
+ ss >> m_readQueueNums;
+ ss >> m_writeQueueNums;
+ ss >> m_perm;
+
+ int type;
+ ss >> type;
+ m_topicFilterType = (TopicFilterType)type;
+
+ return true;
+}
+
+const string& TopicConfig::getTopicName() { return m_topicName; }
+
+void TopicConfig::setTopicName(const string& topicName) {
+ m_topicName = topicName;
+}
+
+int TopicConfig::getReadQueueNums() { return m_readQueueNums; }
+
+void TopicConfig::setReadQueueNums(int readQueueNums) {
+ m_readQueueNums = readQueueNums;
+}
+
+int TopicConfig::getWriteQueueNums() { return m_writeQueueNums; }
+
+void TopicConfig::setWriteQueueNums(int writeQueueNums) {
+ m_writeQueueNums = writeQueueNums;
+}
+
+int TopicConfig::getPerm() { return m_perm; }
+
+void TopicConfig::setPerm(int perm) { m_perm = perm; }
+
+TopicFilterType TopicConfig::getTopicFilterType() { return m_topicFilterType; }
+
+void TopicConfig::setTopicFilterType(TopicFilterType topicFilterType) {
+ m_topicFilterType = topicFilterType;
+}
+//<!***************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicConfig.h b/rocketmq-cpp/src/common/TopicConfig.h
new file mode 100755
index 0000000..0e7c17a
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicConfig.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICCONFIG_H__
+#define __TOPICCONFIG_H__
+
+#include <string>
+#include "TopicFilterType.h"
+#include "UtilAll.h"
+namespace rocketmq {
+//<!************************************************************************
+class TopicConfig {
+ public:
+ TopicConfig();
+ TopicConfig(const string& topicName);
+ TopicConfig(const string& topicName, int readQueueNums, int writeQueueNums,
+ int perm);
+ ~TopicConfig();
+
+ string encode();
+ bool decode(const string& in);
+ const string& getTopicName();
+ void setTopicName(const string& topicName);
+ int getReadQueueNums();
+ void setReadQueueNums(int readQueueNums);
+ int getWriteQueueNums();
+ void setWriteQueueNums(int writeQueueNums);
+ int getPerm();
+ void setPerm(int perm);
+ TopicFilterType getTopicFilterType();
+ void setTopicFilterType(TopicFilterType topicFilterType);
+
+ public:
+ static int DefaultReadQueueNums;
+ static int DefaultWriteQueueNums;
+
+ private:
+ static string SEPARATOR;
+
+ string m_topicName;
+ int m_readQueueNums;
+ int m_writeQueueNums;
+ int m_perm;
+ TopicFilterType m_topicFilterType;
+};
+//<!***************************************************************************
+} //<!end namespace;
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/common/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/common/TopicFilterType.h b/rocketmq-cpp/src/common/TopicFilterType.h
new file mode 100755
index 0000000..9055003
--- /dev/null
+++ b/rocketmq-cpp/src/common/TopicFilterType.h
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TOPICFILTERTYPE_H__
+#define __TOPICFILTERTYPE_H__
+
+namespace rocketmq {
+//<!***************************************************************************
+enum TopicFilterType {
+ /**
+ * each msg could only have one tag
+ */
+ SINGLE_TAG,
+ /**
+ * not support now
+ */
+ MULTI_TAG
+};
+//<!***************************************************************************
+} //<!end namespace;
+#endif