You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/19 02:49:31 UTC
[rocketmq-ons-cpp] branch master updated: Add C style APIs to
support C functionalities (#3)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new eec01ad Add C style APIs to support C functionalities (#3)
eec01ad is described below
commit eec01ad4be6beba6367f41b688c072443b198476
Author: dinglei <li...@163.com>
AuthorDate: Mon Aug 19 10:49:27 2019 +0800
Add C style APIs to support C functionalities (#3)
* Support C API
* Add apache license header for CConst.h
---
CMakeLists.txt | 2 +
src/main/c/include/CBatchMessage.h | 36 ++++
src/main/c/include/CCommon.h | 87 ++++++++
src/main/c/include/CErrorMessage.h | 32 +++
src/main/c/include/CMQException.h | 40 ++++
src/main/c/include/CMessage.h | 42 ++++
src/main/c/include/CMessageExt.h | 49 +++++
src/main/c/include/CMessageQueue.h | 36 ++++
src/main/c/include/CProducer.h | 84 ++++++++
src/main/c/include/CPullConsumer.h | 61 ++++++
src/main/c/include/CPullResult.h | 48 +++++
src/main/c/include/CPushConsumer.h | 63 ++++++
src/main/c/include/CSendResult.h | 42 ++++
src/main/cpp/include/ONSFactory.h | 6 +
src/main/cpp/sdk/CMakeLists.txt | 10 +-
src/main/cpp/sdk/ONSFactory.cpp | 28 ++-
src/main/cpp/sdk/common/UtilAll.h | 4 +
src/main/cpp/sdk/extern/CBatchMessage.cpp | 59 ++++++
src/main/cpp/sdk/extern/CConst.h | 52 +++++
src/main/cpp/sdk/extern/CErrorMessage.cpp | 32 +++
src/main/cpp/sdk/extern/CMessage.cpp | 102 ++++++++++
src/main/cpp/sdk/extern/CMessageExt.cpp | 129 ++++++++++++
src/main/cpp/sdk/extern/CProducer.cpp | 327 ++++++++++++++++++++++++++++++
src/main/cpp/sdk/extern/CPullConsumer.cpp | 144 +++++++++++++
src/main/cpp/sdk/extern/CPushConsumer.cpp | 271 +++++++++++++++++++++++++
src/main/cpp/sdk/extern/CSendResult.cpp | 26 +++
26 files changed, 1807 insertions(+), 5 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 98b2797..e51dec7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -25,7 +25,9 @@ option(BUILD_DEMOS "Build demos" ON)
include_directories(graalvm_artifacts
src/main/c/native
+ src/main/c/include
src/main/cpp/include
+ src/main/cpp/sdk/extern
src/main/cpp/sdk/common)
find_library(ROCKETMQ_CLIENT_CORE
diff --git a/src/main/c/include/CBatchMessage.h b/src/main/c/include/CBatchMessage.h
new file mode 100644
index 0000000..6409a55
--- /dev/null
+++ b/src/main/c/include/CBatchMessage.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_BATCHMESSAGE_H__
+#define __C_BATCHMESSAGE_H__
+#include "CCommon.h"
+#include "CMessage.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CBatchMessage CBatchMessage;
+
+ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
+ROCKETMQCLIENT_API int AddMessage(CBatchMessage* batchMsg, CMessage* msg);
+ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_BATCHMESSAGE_H__
diff --git a/src/main/c/include/CCommon.h b/src/main/c/include/CCommon.h
new file mode 100644
index 0000000..59fcd7b
--- /dev/null
+++ b/src/main/c/include/CCommon.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_COMMON_H__
+#define __C_COMMON_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_MESSAGE_ID_LENGTH 256
+#define MAX_TOPIC_LENGTH 512
+#define MAX_BROKER_NAME_ID_LENGTH 256
+typedef enum _CStatus_ {
+
+ OK = 0,// Success
+
+ NULL_POINTER = 1,// Failed, null pointer value
+ MALLOC_FAILED = 2,
+ PRODUCER_ERROR_CODE_START = 10,
+ PRODUCER_START_FAILED = 10,
+ PRODUCER_SEND_SYNC_FAILED = 11,
+ PRODUCER_SEND_ONEWAY_FAILED = 12,
+ PRODUCER_SEND_ORDERLY_FAILED = 13,
+ PRODUCER_SEND_ASYNC_FAILED = 14,
+ PRODUCER_SEND_ORDERLYASYNC_FAILED = 15,
+
+ PUSHCONSUMER_ERROR_CODE_START = 20,
+ PUSHCONSUMER_START_FAILED = 20,
+
+ PULLCONSUMER_ERROR_CODE_START = 30,
+ PULLCONSUMER_START_FAILED = 30,
+ PULLCONSUMER_FETCH_MQ_FAILED = 31,
+ PULLCONSUMER_FETCH_MESSAGE_FAILED = 32,
+ NOT_SUPPORT_NOW = -1
+} CStatus;
+
+typedef enum _CLogLevel_ {
+ E_LOG_LEVEL_FATAL = 1,
+ E_LOG_LEVEL_ERROR = 2,
+ E_LOG_LEVEL_WARN = 3,
+ E_LOG_LEVEL_INFO = 4,
+ E_LOG_LEVEL_DEBUG = 5,
+ E_LOG_LEVEL_TRACE = 6,
+ E_LOG_LEVEL_LEVEL_NUM = 7
+} CLogLevel;
+
+#ifdef WIN32
+#ifdef ROCKETMQCLIENT_EXPORTS
+#ifdef _WINDLL
+#define ROCKETMQCLIENT_API __declspec(dllexport)
+#else
+#define ROCKETMQCLIENT_API
+#endif
+#else
+#ifdef ROCKETMQCLIENT_IMPORT
+#define ROCKETMQCLIENT_API __declspec(dllimport)
+#else
+#define ROCKETMQCLIENT_API
+#endif
+#endif
+#else
+#define ROCKETMQCLIENT_API
+#endif
+
+typedef enum _CMessageModel_ {
+ BROADCASTING, CLUSTERING
+} CMessageModel;
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_COMMON_H__
diff --git a/src/main/c/include/CErrorMessage.h b/src/main/c/include/CErrorMessage.h
new file mode 100644
index 0000000..f32479b
--- /dev/null
+++ b/src/main/c/include/CErrorMessage.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CERROR_MESSAGE_H__
+#define __CERROR_MESSAGE_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ROCKETMQCLIENT_API const char* GetLatestErrorMessage(); // Return the last error message
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__CERROR_MESSAGE_H__
diff --git a/src/main/c/include/CMQException.h b/src/main/c/include/CMQException.h
new file mode 100644
index 0000000..51f9560
--- /dev/null
+++ b/src/main/c/include/CMQException.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_EXEPTION_MSG_LENGTH 512
+#define MAX_EXEPTION_FILE_LENGTH 256
+#define MAX_EXEPTION_TYPE_LENGTH 128
+typedef struct _CMQException_ {
+ int error;
+ int line;
+ char file[MAX_EXEPTION_FILE_LENGTH];
+ char msg[MAX_EXEPTION_MSG_LENGTH];
+ char type[MAX_EXEPTION_TYPE_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/src/main/c/include/CMessage.h b/src/main/c/include/CMessage.h
new file mode 100644
index 0000000..f1b057b
--- /dev/null
+++ b/src/main/c/include/CMessage.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_H__
+#define __C_MESSAGE_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CMessage_ CMessage;
+typedef struct CMessage CMessage;
+
+ROCKETMQCLIENT_API CMessage* CreateMessage(const char* topic);
+ROCKETMQCLIENT_API int DestroyMessage(CMessage* msg);
+ROCKETMQCLIENT_API int SetMessageTopic(CMessage* msg, const char* topic);
+ROCKETMQCLIENT_API int SetMessageTags(CMessage* msg, const char* tags);
+ROCKETMQCLIENT_API int SetMessageKeys(CMessage* msg, const char* keys);
+ROCKETMQCLIENT_API int SetMessageBody(CMessage* msg, const char* body);
+ROCKETMQCLIENT_API int SetByteMessageBody(CMessage* msg, const char* body, int len);
+ROCKETMQCLIENT_API int SetMessageProperty(CMessage* msg, const char* key, const char* value);
+ROCKETMQCLIENT_API int SetDelayTimeLevel(CMessage* msg, int level);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_MESSAGE_H__
diff --git a/src/main/c/include/CMessageExt.h b/src/main/c/include/CMessageExt.h
new file mode 100644
index 0000000..acd85e3
--- /dev/null
+++ b/src/main/c/include/CMessageExt.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_EXT_H__
+#define __C_MESSAGE_EXT_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CMessageExt_ _CMessageExt;
+typedef struct CMessageExt CMessageExt;
+
+ROCKETMQCLIENT_API const char* GetMessageTopic(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageTags(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageKeys(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageBody(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageProperty(CMessageExt* msgExt, const char* key);
+ROCKETMQCLIENT_API const char* GetMessageId(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageDelayTimeLevel(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageQueueId(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageReconsumeTimes(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageStoreSize(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageBornTimestamp(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageStoreTimestamp(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageQueueOffset(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageCommitLogOffset(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessagePreparedTransactionOffset(CMessageExt* msgExt);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_MESSAGE_EXT_H__
diff --git a/src/main/c/include/CMessageQueue.h b/src/main/c/include/CMessageQueue.h
new file mode 100644
index 0000000..c514f9d
--- /dev/null
+++ b/src/main/c/include/CMessageQueue.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_QUEUE_H__
+#define __C_MESSAGE_QUEUE_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _CMessageQueue_ {
+ char topic[MAX_TOPIC_LENGTH];
+ char brokerName[MAX_BROKER_NAME_ID_LENGTH];
+ int queueId;
+} CMessageQueue;
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_MESSAGE_H__
diff --git a/src/main/c/include/CProducer.h b/src/main/c/include/CProducer.h
new file mode 100644
index 0000000..6b00575
--- /dev/null
+++ b/src/main/c/include/CProducer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PRODUCER_H__
+#define __C_PRODUCER_H__
+
+#include "CBatchMessage.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CProducer_ _CProducer;
+typedef struct CProducer CProducer;
+typedef int (*QueueSelectorCallback)(int size, CMessage* msg, void* arg);
+typedef void (*CSendSuccessCallback)(CSendResult result);
+typedef void (*CSendExceptionCallback)(CMQException e);
+
+ROCKETMQCLIENT_API CProducer* CreateProducer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer);
+ROCKETMQCLIENT_API int StartProducer(CProducer* producer);
+ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer);
+
+ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer* producer, const char* namesrv);
+ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer* producer, const char* domain);
+ROCKETMQCLIENT_API int SetProducerGroupName(CProducer* producer, const char* groupName);
+ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer* producer, const char* instanceName);
+ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer* producer,
+ const char* accessKey,
+ const char* secretKey,
+ const char* onsChannel);
+ROCKETMQCLIENT_API int SetProducerLogPath(CProducer* producer, const char* logPath);
+ROCKETMQCLIENT_API int SetProducerLogFileNumAndSize(CProducer* producer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetProducerLogLevel(CProducer* producer, CLogLevel level);
+ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout);
+ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
+ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
+
+ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
+ CMessage* msg,
+ CSendSuccessCallback cSendSuccessCallback,
+ CSendExceptionCallback cSendExceptionCallback);
+ROCKETMQCLIENT_API int SendMessageOneway(CProducer* producer, CMessage* msg);
+ROCKETMQCLIENT_API int SendMessageOnewayOrderly(CProducer* producer,
+ CMessage* msg,
+ QueueSelectorCallback selector,
+ void* arg);
+ROCKETMQCLIENT_API int SendMessageOrderly(CProducer* producer,
+ CMessage* msg,
+ QueueSelectorCallback callback,
+ void* arg,
+ int autoRetryTimes,
+ CSendResult* result);
+
+ROCKETMQCLIENT_API int SendMessageOrderlyAsync(CProducer* producer,
+ CMessage* msg,
+ QueueSelectorCallback callback,
+ void* arg,
+ CSendSuccessCallback cSendSuccessCallback,
+ CSendExceptionCallback cSendExceptionCallback);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_PRODUCER_H__
\ No newline at end of file
diff --git a/src/main/c/include/CPullConsumer.h b/src/main/c/include/CPullConsumer.h
new file mode 100644
index 0000000..0157108
--- /dev/null
+++ b/src/main/c/include/CPullConsumer.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PULL_CONSUMER_H__
+#define __C_PULL_CONSUMER_H__
+
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "CMessageQueue.h"
+#include "CPullResult.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CPullConsumer CPullConsumer;
+
+ROCKETMQCLIENT_API CPullConsumer* CreatePullConsumer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId);
+ROCKETMQCLIENT_API const char* GetPullConsumerGroupID(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer* consumer, const char* domain);
+ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer* consumer,
+ const char* accessKey,
+ const char* secretKey,
+ const char* channel);
+ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer* consumer, const char* logPath);
+ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer* consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer* consumer, CLogLevel level);
+
+ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer* consumer,
+ const char* topic,
+ CMessageQueue** mqs,
+ int* size);
+ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue* mqs);
+
+ROCKETMQCLIENT_API CPullResult
+Pull(CPullConsumer* consumer, const CMessageQueue* mq, const char* subExpression, long long offset, int maxNums);
+ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_PUSH_CONSUMER_H__
diff --git a/src/main/c/include/CPullResult.h b/src/main/c/include/CPullResult.h
new file mode 100644
index 0000000..2ab174f
--- /dev/null
+++ b/src/main/c/include/CPullResult.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PULL_RESULT_H__
+#define __C_PULL_RESULT_H__
+
+#include "CCommon.h"
+#include "CMessageExt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+typedef enum E_CPullStatus {
+ E_FOUND,
+ E_NO_NEW_MSG,
+ E_NO_MATCHED_MSG,
+ E_OFFSET_ILLEGAL,
+ E_BROKER_TIMEOUT // indicate pull request timeout or received NULL response
+} CPullStatus;
+
+typedef struct _CPullResult_ {
+ CPullStatus pullStatus;
+ long long nextBeginOffset;
+ long long minOffset;
+ long long maxOffset;
+ CMessageExt** msgFoundList;
+ int size;
+ void* pData;
+} CPullResult;
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_PULL_RESULT_H__
diff --git a/src/main/c/include/CPushConsumer.h b/src/main/c/include/CPushConsumer.h
new file mode 100644
index 0000000..10b5587
--- /dev/null
+++ b/src/main/c/include/CPushConsumer.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PUSH_CONSUMER_H__
+#define __C_PUSH_CONSUMER_H__
+
+#include "CMessageExt.h"
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CConsumer_ _CConsumer;
+typedef struct CPushConsumer CPushConsumer;
+
+typedef enum E_CConsumeStatus { E_CONSUME_SUCCESS = 0, E_RECONSUME_LATER = 1 } CConsumeStatus;
+
+typedef int (*MessageCallBack)(CPushConsumer*, CMessageExt*);
+
+ROCKETMQCLIENT_API CPushConsumer* CreatePushConsumer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId);
+ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer* consumer, const char* domain);
+ROCKETMQCLIENT_API int Subscribe(CPushConsumer* consumer, const char* topic, const char* expression);
+ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer* consumer, MessageCallBack pCallback);
+ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer* consumer, MessageCallBack pCallback);
+ROCKETMQCLIENT_API int UnregisterMessageCallbackOrderly(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int UnregisterMessageCallback(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerThreadCount(CPushConsumer* consumer, int threadCount);
+ROCKETMQCLIENT_API int SetPushConsumerMessageBatchMaxSize(CPushConsumer* consumer, int batchSize);
+ROCKETMQCLIENT_API int SetPushConsumerInstanceName(CPushConsumer* consumer, const char* instanceName);
+ROCKETMQCLIENT_API int SetPushConsumerSessionCredentials(CPushConsumer* consumer,
+ const char* accessKey,
+ const char* secretKey,
+ const char* channel);
+ROCKETMQCLIENT_API int SetPushConsumerLogPath(CPushConsumer* consumer, const char* logPath);
+ROCKETMQCLIENT_API int SetPushConsumerLogFileNumAndSize(CPushConsumer* consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level);
+ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_PUSH_CONSUMER_H__
diff --git a/src/main/c/include/CSendResult.h b/src/main/c/include/CSendResult.h
new file mode 100644
index 0000000..fa640ca
--- /dev/null
+++ b/src/main/c/include/CSendResult.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_SEND_RESULT_H__
+#define __C_SEND_RESULT_H__
+
+#include "CCommon.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum E_CSendStatus_ {
+ E_SEND_OK = 0,
+ E_SEND_FLUSH_DISK_TIMEOUT = 1,
+ E_SEND_FLUSH_SLAVE_TIMEOUT = 2,
+ E_SEND_SLAVE_NOT_AVAILABLE = 3
+} CSendStatus;
+
+typedef struct _SendResult_ {
+ CSendStatus sendStatus;
+ char msgId[MAX_MESSAGE_ID_LENGTH];
+ long long offset;
+} CSendResult;
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_PRODUCER_H__
diff --git a/src/main/cpp/include/ONSFactory.h b/src/main/cpp/include/ONSFactory.h
index 2ff00c4..566dc48 100755
--- a/src/main/cpp/include/ONSFactory.h
+++ b/src/main/cpp/include/ONSFactory.h
@@ -68,12 +68,16 @@ namespace ons {
const int getSendMsgTimeout() const;
+ const int getSuspendTimeMillis() const;
+
const int getSendMsgRetryTimes() const;
const int getConsumeThreadNums() const;
const int getMaxMsgCacheSize() const;
+ const int getMaxMsgCacheSizeInMiB() const;
+
const ONSChannel getOnsChannel() const;
const char *getChannel() const;
@@ -108,10 +112,12 @@ namespace ons {
static const char *BROADCASTING;
static const char *CLUSTERING;
static const char *SendMsgTimeoutMillis;
+ static const char *SuspendTimeMillis;
static const char *NAMESRV_ADDR;
static const char *ConsumeThreadNums;
static const char *OnsChannel;
static const char *MaxMsgCacheSize;
+ static const char *MaxCachedMessageSizeInMiB;
static const char *OnsTraceSwitch;
static const char *SendMsgRetryTimes;
static const char *ConsumerInstanceName;
diff --git a/src/main/cpp/sdk/CMakeLists.txt b/src/main/cpp/sdk/CMakeLists.txt
index 5751821..eac4c39 100644
--- a/src/main/cpp/sdk/CMakeLists.txt
+++ b/src/main/cpp/sdk/CMakeLists.txt
@@ -12,6 +12,14 @@ add_library(${LIBRARY_NAME} SHARED
TransactionProducerImpl.cpp
common/UtilAll.cpp
common/Logger.cpp
- common/ONSClientAbstract.cpp)
+ common/ONSClientAbstract.cpp
+ extern/CBatchMessage.cpp
+ extern/CErrorMessage.cpp
+ extern/CMessage.cpp
+ extern/CMessageExt.cpp
+ extern/CProducer.cpp
+ extern/CPullConsumer.cpp
+ extern/CPushConsumer.cpp
+ extern/CSendResult.cpp)
target_link_libraries(${LIBRARY_NAME} ${ROCKETMQ_CLIENT_CORE})
set_target_properties(${LIBRARY_NAME} PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib VERSION ${CLIENT_VERSION} SOVERSION ${CLIENT_VERSION})
\ No newline at end of file
diff --git a/src/main/cpp/sdk/ONSFactory.cpp b/src/main/cpp/sdk/ONSFactory.cpp
index 662df85..bf1686f 100644
--- a/src/main/cpp/sdk/ONSFactory.cpp
+++ b/src/main/cpp/sdk/ONSFactory.cpp
@@ -39,8 +39,10 @@ namespace ons {
const char *ONSFactoryProperty::BROADCASTING = "BROADCASTING";
const char *ONSFactoryProperty::CLUSTERING = "CLUSTERING";
const char *ONSFactoryProperty::SendMsgTimeoutMillis = "SendMsgTimeoutMillis";
+ const char *ONSFactoryProperty::SuspendTimeMillis = "SuspendTimeMillis";
const char *ONSFactoryProperty::SendMsgRetryTimes = "SendMsgRetryTimes";
const char *ONSFactoryProperty::MaxMsgCacheSize = "MaxMsgCacheSize";
+ const char *ONSFactoryProperty::MaxCachedMessageSizeInMiB = "MaxCachedMessageSizeInMiB";
const char *ONSFactoryProperty::ONSAddr =
"ONSAddr"; // name server domain name
const char *ONSFactoryProperty::NAMESRV_ADDR =
@@ -219,7 +221,15 @@ namespace ons {
if (it != m_onsFactoryProperties.end()) {
return atoi((*it).second.c_str());
}
- return 0;
+ return -1;
+ }
+ const int ONSFactoryProperty::getSuspendTimeMillis() const {
+ map<string, string>::const_iterator it =
+ m_onsFactoryProperties.find(SuspendTimeMillis);
+ if (it != m_onsFactoryProperties.end()) {
+ return atoi((*it).second.c_str());
+ }
+ return -1;
}
const int ONSFactoryProperty::getSendMsgRetryTimes() const {
@@ -228,7 +238,7 @@ namespace ons {
if (it != m_onsFactoryProperties.end()) {
return atoi((*it).second.c_str());
}
- return 0;
+ return -1;
}
const int ONSFactoryProperty::getMaxMsgCacheSize() const {
@@ -238,7 +248,17 @@ namespace ons {
return atoi((*it).second.c_str());
}
- return 0;
+ return -1;
+ }
+
+ const int ONSFactoryProperty::getMaxMsgCacheSizeInMiB() const {
+ map<string, string>::const_iterator it =
+ m_onsFactoryProperties.find(MaxCachedMessageSizeInMiB);
+ if (it != m_onsFactoryProperties.end()) {
+ return atoi((*it).second.c_str());
+ }
+
+ return -1;
}
const ONSChannel ONSFactoryProperty::getOnsChannel() const {
@@ -270,7 +290,7 @@ namespace ons {
return atoi((*it).second.c_str());
}
- return 0;
+ return -1;
}
const char *ONSFactoryProperty::getNameSrvAddr() const {
diff --git a/src/main/cpp/sdk/common/UtilAll.h b/src/main/cpp/sdk/common/UtilAll.h
index dae436b..b49dc02 100644
--- a/src/main/cpp/sdk/common/UtilAll.h
+++ b/src/main/cpp/sdk/common/UtilAll.h
@@ -54,6 +54,10 @@ namespace ons {
return atoll(str);
}
+ static int str2i(const char *str) {
+ return atoi(str);
+ }
+
static std::string to_string(const std::map<std::string, std::string> &prop);
static graal_isolate_t *get_isolate() {
diff --git a/src/main/cpp/sdk/extern/CBatchMessage.cpp b/src/main/cpp/sdk/extern/CBatchMessage.cpp
new file mode 100644
index 0000000..8fbedee
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CBatchMessage.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "ONSFactory.h"
+
+using std::vector;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace ons;
+
+CBatchMessage* CreateBatchMessage() {
+ vector<Message>* msgs = new vector<Message>();
+ return (CBatchMessage*)msgs;
+}
+
+int AddMessage(CBatchMessage* batchMsg, CMessage* msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ if (batchMsg == NULL) {
+ return NULL_POINTER;
+ }
+ Message* message = (Message*)msg;
+ ((vector<Message>*)batchMsg)->push_back(*message);
+ return OK;
+}
+int DestroyBatchMessage(CBatchMessage* batchMsg) {
+ if (batchMsg == NULL) {
+ return NULL_POINTER;
+ }
+ delete (vector<Message>*)batchMsg;
+ return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CConst.h b/src/main/cpp/sdk/extern/CConst.h
new file mode 100644
index 0000000..a2d510a
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CConst.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ROCKETMQ_ONS_CPP_CCONST_H
+#define ROCKETMQ_ONS_CPP_CCONST_H
+
+#endif //ROCKETMQ_ONS_CPP_CCONST_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+#define CAPI_PROPERTY_KEYS "KEYS"
+#define CAPI_PROPERTY_TAGS "TAGS"
+#define CAPI_PROPERTY_WAIT_STORE_MSG_OK "WAIT"
+#define CAPI_PROPERTY_DELAY_TIME_LEVEL "DELAY"
+#define CAPI_PROPERTY_RETRY_TOPIC "RETRY_TOPIC"
+#define CAPI_PROPERTY_REAL_TOPIC "REAL_TOPIC"
+#define CAPI_PROPERTY_REAL_QUEUE_ID "REAL_QID"
+#define CAPI_PROPERTY_TRANSACTION_PREPARED "TRAN_MSG"
+#define CAPI_PROPERTY_PRODUCER_GROUP "PGROUP"
+#define CAPI_PROPERTY_MIN_OFFSET "MIN_OFFSET"
+#define CAPI_PROPERTY_MAX_OFFSET "MAX_OFFSET"
+#define CAPI_PROPERTY_ORIGIN_MESSAGE_ID "ORIGIN_MESSAGE_ID"
+#define CAPI_PROPERTY_TRANSFER_FLAG "TRANSFER_FLAG"
+#define CAPI_PROPERTY_CORRECTION_FLAG "CORRECTION_FLAG"
+#define CAPI_PROPERTY_MQ2_FLAG "MQ2_FLAG"
+#define CAPI_PROPERTY_RECONSUME_TIME "RECONSUME_TIME"
+#define CAPI_PROPERTY_MSG_REGION "MSG_REGION"
+#define CAPI_PROPERTY_TRACE_SWITCH "TRACE_ON"
+#define CAPI_PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX "UNIQ_KEY"
+#define CAPI_PROPERTY_MAX_RECONSUME_TIMES "MAX_RECONSUME_TIMES"
+#define CAPI_PROPERTY_CONSUME_START_TIMESTAMP "CONSUME_START_TIME"
+#define CAPI_PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET "TRAN_PREPARED_QUEUE_OFFSET"
+#define CAPI_PROPERTY_TRANSACTION_CHECK_TIMES "TRANSACTION_CHECK_TIMES"
+#define CAPI_PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS "CHECK_IMMUNITY_TIME_IN_SECONDS"
+
+#ifdef __cplusplus
+};
+#endif
\ No newline at end of file
diff --git a/src/main/cpp/sdk/extern/CErrorMessage.cpp b/src/main/cpp/sdk/extern/CErrorMessage.cpp
new file mode 100644
index 0000000..aecd721
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CErrorMessage.cpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CErrorMessage.h"
+#include "CCommon.h"
+#include "ONSFactory.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+const char* GetLatestErrorMessage() {
+ return NULL;
+}
+
+#ifdef __cplusplus
+};
+#endif
\ No newline at end of file
diff --git a/src/main/cpp/sdk/extern/CMessage.cpp b/src/main/cpp/sdk/extern/CMessage.cpp
new file mode 100644
index 0000000..b0400e4
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CMessage.cpp
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CMessage.h"
+#include "CCommon.h"
+#include "ONSFactory.h"
+#include "CConst.h"
+#include "../common/UtilAll.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace ons;
+
+CMessage* CreateMessage(const char* topic) {
+ Message* mqMessage = new Message();
+ if (topic != NULL) {
+ mqMessage->setTopic(topic);
+ }
+ return (CMessage*)mqMessage;
+}
+int DestroyMessage(CMessage* msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ delete (Message*)msg;
+ return OK;
+}
+int SetMessageTopic(CMessage* msg, const char* topic) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setTopic(topic);
+ return OK;
+}
+int SetMessageTags(CMessage* msg, const char* tags) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setTag(tags);
+ return OK;
+}
+int SetMessageKeys(CMessage* msg, const char* keys) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setKey(keys);
+ return OK;
+}
+int SetMessageBody(CMessage* msg, const char* body) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setMsgBody(UtilAll::to_string(body));
+ return OK;
+}
+int SetByteMessageBody(CMessage* msg, const char* body, int len) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setMsgBody(std::string(body, len));
+ return OK;
+}
+int SetMessageProperty(CMessage* msg, const char* key, const char* value) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->putUserProperties(key, value);
+ return OK;
+}
+int SetDelayTimeLevel(CMessage* msg, int level) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->putUserProperties(CAPI_PROPERTY_DELAY_TIME_LEVEL,std::to_string(level).c_str());
+ return OK;
+}
+int SetStartDeliverTime(CMessage* msg, long long level) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ ((Message*)msg)->setStartDeliverTime(level);
+ return OK;
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CMessageExt.cpp b/src/main/cpp/sdk/extern/CMessageExt.cpp
new file mode 100644
index 0000000..9520554
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CMessageExt.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CMessageExt.h"
+#include "CCommon.h"
+#include "CConst.h"
+#include "ONSFactory.h"
+#include "../common/UtilAll.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+using namespace ons;
+const char *GetMessageTopic(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getTopic();
+}
+const char *GetMessageTags(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getTag();
+}
+const char *GetMessageKeys(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getKey();
+}
+const char *GetMessageBody(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getBody();
+}
+const char *GetMessageProperty(CMessageExt *msg, const char *key) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getUserProperties(key);
+}
+const char *GetMessageId(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL;
+ }
+ return ((Message *) msg)->getMsgID();
+}
+
+int GetMessageDelayTimeLevel(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return atoi(((Message *) msg)->getUserProperties(CAPI_PROPERTY_DELAY_TIME_LEVEL));
+}
+
+int GetMessageQueueId(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return UtilAll::str2i(((Message *) msg)->getUserProperties(CAPI_PROPERTY_REAL_QUEUE_ID));
+}
+
+int GetMessageReconsumeTimes(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return ((Message *) msg)->getReconsumeTimes();
+}
+
+int GetMessageStoreSize(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return 0;
+}
+
+long long GetMessageBornTimestamp(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return 0;
+}
+
+long long GetMessageStoreTimestamp(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return ((Message *) msg)->getStoreTimestamp();
+}
+
+long long GetMessageQueueOffset(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return ((Message *) msg)->getQueueOffset();
+}
+
+long long GetMessageCommitLogOffset(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return 0;
+}
+
+long long GetMessagePreparedTransactionOffset(CMessageExt *msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ return UtilAll::str2ll(((Message *) msg)->getUserProperties(CAPI_PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET));
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CProducer.cpp b/src/main/cpp/sdk/extern/CProducer.cpp
new file mode 100644
index 0000000..3d225c5
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CProducer.cpp
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CProducer.h"
+#include <string.h>
+#include <typeindex>
+#include <string.h>
+#include <typeinfo>
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMQException.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "ONSFactory.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+using namespace ons;
+using namespace std;
+
+class CSendCallback : public SendCallbackONS {
+public:
+ CSendCallback(CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback) {
+ m_cSendSuccessCallback = cSendSuccessCallback;
+ m_cSendExceptionCallback = cSendExceptionCallback;
+ }
+
+ virtual ~CSendCallback() {}
+
+ virtual void onSuccess(SendResultONS &sendResult) {
+ CSendResult result;
+ result.sendStatus = E_SEND_OK;
+ result.offset = 0;
+ strncpy(result.msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
+ result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ if (m_cSendSuccessCallback != NULL) {
+ m_cSendSuccessCallback(result);
+ }
+ }
+
+ virtual void onException(ONSClientException &e) {
+ CMQException exception;
+ exception.error = e.GetError();
+ exception.line = e.GetError();
+ strncpy(exception.msg, e.GetMsg(), MAX_EXEPTION_MSG_LENGTH - 1);
+ strncpy(exception.file, e.what(), MAX_EXEPTION_FILE_LENGTH - 1);
+ if (m_cSendExceptionCallback != NULL) {
+ m_cSendExceptionCallback(exception);
+ }
+ }
+
+public:
+ void setM_cSendSuccessCallback(CSendSuccessCallback callback) {
+ m_cSendSuccessCallback = callback;
+ }
+
+ void setM_cSendExceptionCallback(CSendExceptionCallback callback) {
+ m_cSendExceptionCallback = callback;
+ }
+
+private:
+ CSendSuccessCallback m_cSendSuccessCallback;
+ CSendExceptionCallback m_cSendExceptionCallback;
+};
+
+typedef struct __DefaultProducer__ {
+ ONSFactoryProperty factoryInfo;
+ Producer *innerProducer;
+ CSendCallback *cSendCallback;
+} DefaultProducer;
+
+CProducer *CreateProducer(const char *groupId) {
+ if (groupId == NULL) {
+ return NULL;
+ }
+ DefaultProducer *defaultMQProducer = new DefaultProducer();
+ defaultMQProducer->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+ defaultMQProducer->cSendCallback = new CSendCallback(NULL, NULL);
+ return (CProducer *) defaultMQProducer;
+}
+int DestroyProducer(CProducer *pProducer) {
+ if (pProducer == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer *defaultProducer = (DefaultProducer *) pProducer;
+ if (defaultProducer->cSendCallback != NULL) {
+ delete defaultProducer->cSendCallback;
+ }
+ delete reinterpret_cast<DefaultProducer *>(pProducer);
+ return OK;
+}
+int StartProducer(CProducer *producer) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+ try {
+ defaultProducer->innerProducer = ONSFactory::getInstance()->createProducer(defaultProducer->factoryInfo);
+ defaultProducer->innerProducer->start();
+ } catch (exception &e) {
+ return PRODUCER_START_FAILED;
+ }
+ return OK;
+}
+int ShutdownProducer(CProducer *producer) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->innerProducer->shutdown();
+ return OK;
+}
+int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, namesrv);
+ return OK;
+}
+int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, domain);
+ return OK;
+}
+int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
+ // CSendResult sendResult;
+ if (producer == NULL || msg == NULL || result == NULL) {
+ return NULL_POINTER;
+ }
+ try {
+ DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+ Message *message = (Message *) msg;
+ SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
+ result->sendStatus = E_SEND_OK;
+ result->offset = 0;
+ strncpy(result->msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
+ result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ } catch (exception &e) {
+ return PRODUCER_SEND_SYNC_FAILED;
+ }
+ return OK;
+}
+
+int SendBatchMessage(CProducer *producer, CBatchMessage *batcMsg, CSendResult *result) {
+ // CSendResult sendResult;
+ if (producer == NULL || batcMsg == NULL || result == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int SendMessageAsync(CProducer *producer,
+ CMessage *msg,
+ CSendSuccessCallback cSendSuccessCallback,
+ CSendExceptionCallback cSendExceptionCallback) {
+ if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+ Message *message = (Message *) msg;
+ SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
+
+ try {
+ defaultProducer->cSendCallback->setM_cSendSuccessCallback(cSendSuccessCallback);
+ defaultProducer->cSendCallback->setM_cSendExceptionCallback(cSendExceptionCallback);
+ defaultProducer->innerProducer->sendAsync(*message, defaultProducer->cSendCallback);
+ } catch (exception &e) {
+ if (defaultProducer->cSendCallback != NULL) {
+ if (std::type_index(typeid(e)) == std::type_index(typeid(ONSClientException))) {
+ ONSClientException &mqe = (ONSClientException &) e;
+ defaultProducer->cSendCallback->onException(mqe);
+ }
+ }
+ return PRODUCER_SEND_ASYNC_FAILED;
+ }
+ return OK;
+}
+
+int SendMessageOneway(CProducer *producer, CMessage *msg) {
+ if (producer == NULL || msg == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+ Message *message = (Message *) msg;
+ try {
+ defaultProducer->innerProducer->sendOneway(*message);
+ } catch (exception &e) {
+ return PRODUCER_SEND_ONEWAY_FAILED;
+ }
+ return OK;
+}
+
+int SendMessageOnewayOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback selector, void *arg) {
+ if (producer == NULL || msg == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+ Message *message = (Message *) msg;
+ try {
+ //defaultProducer->innerProducer->sendOneway(*message);
+ } catch (exception &e) {
+ return PRODUCER_SEND_ONEWAY_FAILED;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int SendMessageOrderlyAsync(CProducer *producer,
+ CMessage *msg,
+ QueueSelectorCallback callback,
+ void *arg,
+ CSendSuccessCallback cSendSuccessCallback,
+ CSendExceptionCallback cSendExceptionCallback) {
+ if (producer == NULL || msg == NULL || callback == NULL || cSendSuccessCallback == NULL ||
+ cSendExceptionCallback == NULL) {
+ return NULL_POINTER;
+ }
+
+ return NOT_SUPPORT_NOW;
+}
+
+int SendMessageOrderly(CProducer *producer,
+ CMessage *msg,
+ QueueSelectorCallback callback,
+ void *arg,
+ int autoRetryTimes,
+ CSendResult *result) {
+ if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int SetProducerGroupName(CProducer *producer, const char *groupName) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupName);
+ return OK;
+}
+int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumeThreadNums, instanceName);
+ return OK;
+}
+int SetProducerSessionCredentials(CProducer *producer,
+ const char *accessKey,
+ const char *secretKey,
+ const char *onsChannel) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, accessKey);
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, secretKey);
+ return OK;
+}
+int SetProducerLogPath(CProducer *producer, const char *logPath) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, logPath);
+ return OK;
+}
+
+int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+int SetProducerLogLevel(CProducer *producer, CLogLevel level) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ //((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, level);
+ return OK;
+}
+
+int SetProducerSendMsgTimeout(CProducer *producer, int timeout) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultProducer *) producer)->factoryInfo.setSendMsgTimeout(timeout);
+ return OK;
+}
+
+int SetProducerCompressMsgBodyOverHowmuch(CProducer *producer, int howmuch) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+int SetProducerCompressLevel(CProducer *producer, int level) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+int SetProducerMaxMessageSize(CProducer *producer, int size) {
+ if (producer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CPullConsumer.cpp b/src/main/cpp/sdk/extern/CPullConsumer.cpp
new file mode 100644
index 0000000..88b9d12
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CPullConsumer.cpp
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CPullConsumer.h"
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "ONSFactory.h"
+
+using namespace ons;
+using namespace std;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+CPullConsumer *CreatePullConsumer(const char *groupId) {
+ if (groupId == NULL) {
+ return NULL;
+ }
+ return NULL;
+}
+int DestroyPullConsumer(CPullConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+int StartPullConsumer(CPullConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+int ShutdownPullConsumer(CPullConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId) {
+ if (consumer == NULL || groupId == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+const char *GetPullConsumerGroupID(CPullConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL;
+ }
+ return NULL;
+}
+int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerSessionCredentials(CPullConsumer *consumer,
+ const char *accessKey,
+ const char *secretKey,
+ const char *channel) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ // Todo, This api should be implemented by core api.
+ //((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
+ return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return PULLCONSUMER_FETCH_MQ_FAILED;
+}
+int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs) {
+ if (mqs == NULL) {
+ return NULL_POINTER;
+ }
+ free((void *) mqs);
+ mqs = NULL;
+ return OK;
+}
+CPullResult Pull(CPullConsumer *consumer,
+ const CMessageQueue *mq,
+ const char *subExpression,
+ long long offset,
+ int maxNums) {
+ CPullResult pullResult;
+ memset(&pullResult, 0, sizeof(CPullResult));
+ pullResult.pullStatus = E_NO_NEW_MSG;
+ return pullResult;
+}
+int ReleasePullResult(CPullResult pullResult) {
+ if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CPushConsumer.cpp b/src/main/cpp/sdk/extern/CPushConsumer.cpp
new file mode 100644
index 0000000..bc3d788
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CPushConsumer.cpp
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CPushConsumer.h"
+#include <map>
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "ONSFactory.h"
+#include "../common/UtilAll.h"
+
+using namespace ons;
+using namespace std;
+
+class MessageListenerInner : public MessageListener {
+public:
+ MessageListenerInner() {}
+
+ MessageListenerInner(CPushConsumer *consumer, MessageCallBack pCallback) {
+ m_pconsumer = consumer;
+ m_pMsgReceiveCallback = pCallback;
+ }
+
+ ~MessageListenerInner() {}
+
+ Action consume(Message &message, ConsumeContext &context) {
+ if (m_pMsgReceiveCallback == NULL) {
+ return Action::ReconsumeLater;
+ }
+ CMessageExt *msg = (CMessageExt *) (&message);
+ if (m_pMsgReceiveCallback(m_pconsumer, msg) != E_CONSUME_SUCCESS) {
+ return Action::ReconsumeLater;
+ }
+ return Action::CommitMessage;
+ }
+
+private:
+ MessageCallBack m_pMsgReceiveCallback;
+ CPushConsumer *m_pconsumer;
+};
+
+map<CPushConsumer *, MessageListenerInner *> g_ListenerMap;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#ifndef CAPI_MAX_SUB_EXPRESS_LEN
+#define CAPI_MAX_SUB_EXPRESS_LEN 512
+#endif
+
+typedef struct __DefaultPushConsumer__ {
+ ONSFactoryProperty factoryInfo;
+ PushConsumer *innerConsumer;
+ char expression[CAPI_MAX_SUB_EXPRESS_LEN];
+} DefaultPushConsumer;
+
+CPushConsumer *CreatePushConsumer(const char *groupId) {
+ if (groupId == NULL) {
+ return NULL;
+ }
+ DefaultPushConsumer *defaultPushConsumer = new DefaultPushConsumer();
+ defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+ memset(defaultPushConsumer->expression, 0, CAPI_MAX_SUB_EXPRESS_LEN);
+ return (CPushConsumer *) defaultPushConsumer;
+}
+int DestroyPushConsumer(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ delete reinterpret_cast<DefaultPushConsumer *>(consumer);
+ return OK;
+}
+int StartPushConsumer(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+ try {
+ defaultPushConsumer->innerConsumer = ONSFactory::getInstance()->createPushConsumer(
+ defaultPushConsumer->factoryInfo);
+ defaultPushConsumer->innerConsumer->start();
+ } catch (exception &e) {
+ return PULLCONSUMER_START_FAILED;
+ }
+ return OK;
+}
+int ShutdownPushConsumer(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+ defaultPushConsumer->innerConsumer->shutdown();
+ return OK;
+}
+int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId) {
+ if (consumer == NULL || groupId == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+ return OK;
+}
+const char *GetPushConsumerGroupID(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL;
+ }
+ return ((DefaultPushConsumer *) consumer)->factoryInfo.getGroupId();
+}
+int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, namesrv);
+ return OK;
+}
+int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, domain);
+ return OK;
+}
+int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, topic);
+ memset(((DefaultPushConsumer *) consumer)->expression, 0, CAPI_MAX_SUB_EXPRESS_LEN);
+ strncpy(((DefaultPushConsumer *) consumer)->expression, expression, CAPI_MAX_SUB_EXPRESS_LEN - 1);
+ return OK;
+}
+
+int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback) {
+ if (consumer == NULL || pCallback == NULL) {
+ return NULL_POINTER;
+ }
+
+ DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+ MessageListenerInner *listenerInner = new MessageListenerInner(consumer, pCallback);
+ defaultPushConsumer->innerConsumer->subscribe(defaultPushConsumer->factoryInfo.getPublishTopics(),
+ defaultPushConsumer->expression, listenerInner);
+ g_ListenerMap[consumer] = listenerInner;
+ return OK;
+}
+
+int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback) {
+ if (consumer == NULL || pCallback == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return NOT_SUPPORT_NOW;
+}
+
+int UnregisterMessageCallback(CPushConsumer *consumer) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ map<CPushConsumer *, MessageListenerInner *>::iterator iter;
+ iter = g_ListenerMap.find(consumer);
+
+ if (iter != g_ListenerMap.end()) {
+ MessageListenerInner *listenerInner = iter->second;
+ if (listenerInner != NULL) {
+ delete listenerInner;
+ }
+ g_ListenerMap.erase(iter);
+ }
+ return OK;
+}
+
+int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel messageModel) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+
+ DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+ switch (messageModel) {
+ case BROADCASTING:
+ defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+ ONSFactoryProperty::BROADCASTING);
+ break;
+ case CLUSTERING:
+ defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+ ONSFactoryProperty::CLUSTERING);
+ break;
+ default:
+ defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+ ONSFactoryProperty::CLUSTERING);
+ break;
+ }
+ return OK;
+}
+int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
+ if (consumer == NULL || threadCount == 0) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumeThreadNums,
+ UtilAll::to_string(threadCount).c_str());
+ return OK;
+}
+int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize) {
+ if (consumer == NULL || batchSize == 0) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerInstanceName,
+ instanceName);
+ return OK;
+}
+
+int SetPushConsumerSessionCredentials(CPushConsumer *consumer,
+ const char *accessKey,
+ const char *secretKey,
+ const char *channel) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, accessKey);
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, secretKey);
+ return OK;
+}
+
+int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, logPath);
+ return OK;
+}
+
+int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level) {
+ if (consumer == NULL) {
+ return NULL_POINTER;
+ }
+ return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CSendResult.cpp b/src/main/cpp/sdk/extern/CSendResult.cpp
new file mode 100644
index 0000000..8d4dc2b
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CSendResult.cpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CSendResult.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifdef __cplusplus
+};
+#endif