You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/08/19 02:49:31 UTC

[rocketmq-ons-cpp] branch master updated: Add C style APIs to support C functionalities (#3)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new eec01ad  Add C style APIs to support C functionalities (#3)
eec01ad is described below

commit eec01ad4be6beba6367f41b688c072443b198476
Author: dinglei <li...@163.com>
AuthorDate: Mon Aug 19 10:49:27 2019 +0800

    Add C style APIs to support C functionalities (#3)
    
    * Support C API
    
    * Add apache license header for CConst.h
---
 CMakeLists.txt                            |   2 +
 src/main/c/include/CBatchMessage.h        |  36 ++++
 src/main/c/include/CCommon.h              |  87 ++++++++
 src/main/c/include/CErrorMessage.h        |  32 +++
 src/main/c/include/CMQException.h         |  40 ++++
 src/main/c/include/CMessage.h             |  42 ++++
 src/main/c/include/CMessageExt.h          |  49 +++++
 src/main/c/include/CMessageQueue.h        |  36 ++++
 src/main/c/include/CProducer.h            |  84 ++++++++
 src/main/c/include/CPullConsumer.h        |  61 ++++++
 src/main/c/include/CPullResult.h          |  48 +++++
 src/main/c/include/CPushConsumer.h        |  63 ++++++
 src/main/c/include/CSendResult.h          |  42 ++++
 src/main/cpp/include/ONSFactory.h         |   6 +
 src/main/cpp/sdk/CMakeLists.txt           |  10 +-
 src/main/cpp/sdk/ONSFactory.cpp           |  28 ++-
 src/main/cpp/sdk/common/UtilAll.h         |   4 +
 src/main/cpp/sdk/extern/CBatchMessage.cpp |  59 ++++++
 src/main/cpp/sdk/extern/CConst.h          |  52 +++++
 src/main/cpp/sdk/extern/CErrorMessage.cpp |  32 +++
 src/main/cpp/sdk/extern/CMessage.cpp      | 102 ++++++++++
 src/main/cpp/sdk/extern/CMessageExt.cpp   | 129 ++++++++++++
 src/main/cpp/sdk/extern/CProducer.cpp     | 327 ++++++++++++++++++++++++++++++
 src/main/cpp/sdk/extern/CPullConsumer.cpp | 144 +++++++++++++
 src/main/cpp/sdk/extern/CPushConsumer.cpp | 271 +++++++++++++++++++++++++
 src/main/cpp/sdk/extern/CSendResult.cpp   |  26 +++
 26 files changed, 1807 insertions(+), 5 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 98b2797..e51dec7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -25,7 +25,9 @@ option(BUILD_DEMOS "Build demos" ON)
 
 include_directories(graalvm_artifacts
         src/main/c/native
+        src/main/c/include
         src/main/cpp/include
+        src/main/cpp/sdk/extern
         src/main/cpp/sdk/common)
 
 find_library(ROCKETMQ_CLIENT_CORE
diff --git a/src/main/c/include/CBatchMessage.h b/src/main/c/include/CBatchMessage.h
new file mode 100644
index 0000000..6409a55
--- /dev/null
+++ b/src/main/c/include/CBatchMessage.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_BATCHMESSAGE_H__
+#define __C_BATCHMESSAGE_H__
+#include "CCommon.h"
+#include "CMessage.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CBatchMessage CBatchMessage;
+
+ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
+ROCKETMQCLIENT_API int AddMessage(CBatchMessage* batchMsg, CMessage* msg);
+ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_BATCHMESSAGE_H__
diff --git a/src/main/c/include/CCommon.h b/src/main/c/include/CCommon.h
new file mode 100644
index 0000000..59fcd7b
--- /dev/null
+++ b/src/main/c/include/CCommon.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_COMMON_H__
+#define __C_COMMON_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_MESSAGE_ID_LENGTH 256
+#define MAX_TOPIC_LENGTH 512
+#define MAX_BROKER_NAME_ID_LENGTH 256
+typedef enum _CStatus_ {
+
+    OK = 0,// Success
+
+    NULL_POINTER = 1,// Failed, null pointer value
+    MALLOC_FAILED = 2,
+    PRODUCER_ERROR_CODE_START = 10,
+    PRODUCER_START_FAILED = 10,
+    PRODUCER_SEND_SYNC_FAILED = 11,
+    PRODUCER_SEND_ONEWAY_FAILED = 12,
+    PRODUCER_SEND_ORDERLY_FAILED = 13,
+    PRODUCER_SEND_ASYNC_FAILED = 14,
+    PRODUCER_SEND_ORDERLYASYNC_FAILED = 15,
+
+    PUSHCONSUMER_ERROR_CODE_START = 20,
+    PUSHCONSUMER_START_FAILED = 20,
+
+    PULLCONSUMER_ERROR_CODE_START = 30,
+    PULLCONSUMER_START_FAILED = 30,
+    PULLCONSUMER_FETCH_MQ_FAILED = 31,
+    PULLCONSUMER_FETCH_MESSAGE_FAILED = 32,
+    NOT_SUPPORT_NOW = -1
+} CStatus;
+
+typedef enum _CLogLevel_ {
+    E_LOG_LEVEL_FATAL = 1,
+    E_LOG_LEVEL_ERROR = 2,
+    E_LOG_LEVEL_WARN = 3,
+    E_LOG_LEVEL_INFO = 4,
+    E_LOG_LEVEL_DEBUG = 5,
+    E_LOG_LEVEL_TRACE = 6,
+    E_LOG_LEVEL_LEVEL_NUM = 7
+} CLogLevel;
+
+#ifdef WIN32
+#ifdef ROCKETMQCLIENT_EXPORTS
+#ifdef _WINDLL
+#define ROCKETMQCLIENT_API __declspec(dllexport)
+#else
+#define ROCKETMQCLIENT_API
+#endif
+#else
+#ifdef ROCKETMQCLIENT_IMPORT
+#define ROCKETMQCLIENT_API __declspec(dllimport)
+#else
+#define ROCKETMQCLIENT_API
+#endif
+#endif
+#else
+#define ROCKETMQCLIENT_API
+#endif
+
+typedef enum _CMessageModel_ {
+    BROADCASTING, CLUSTERING
+} CMessageModel;
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_COMMON_H__
diff --git a/src/main/c/include/CErrorMessage.h b/src/main/c/include/CErrorMessage.h
new file mode 100644
index 0000000..f32479b
--- /dev/null
+++ b/src/main/c/include/CErrorMessage.h
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __CERROR_MESSAGE_H__
+#define __CERROR_MESSAGE_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+ROCKETMQCLIENT_API const char* GetLatestErrorMessage();  // Return the last error message
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__CERROR_MESSAGE_H__
diff --git a/src/main/c/include/CMQException.h b/src/main/c/include/CMQException.h
new file mode 100644
index 0000000..51f9560
--- /dev/null
+++ b/src/main/c/include/CMQException.h
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __C_MQEXCPTION_H__
+#define __C_MQEXCPTION_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define MAX_EXEPTION_MSG_LENGTH 512
+#define MAX_EXEPTION_FILE_LENGTH 256
+#define MAX_EXEPTION_TYPE_LENGTH 128
+typedef struct _CMQException_ {
+  int error;
+  int line;
+  char file[MAX_EXEPTION_FILE_LENGTH];
+  char msg[MAX_EXEPTION_MSG_LENGTH];
+  char type[MAX_EXEPTION_TYPE_LENGTH];
+
+} CMQException;
+
+#ifdef __cplusplus
+};
+#endif
+#endif
diff --git a/src/main/c/include/CMessage.h b/src/main/c/include/CMessage.h
new file mode 100644
index 0000000..f1b057b
--- /dev/null
+++ b/src/main/c/include/CMessage.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_H__
+#define __C_MESSAGE_H__
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CMessage_ CMessage;
+typedef struct CMessage CMessage;
+
+ROCKETMQCLIENT_API CMessage* CreateMessage(const char* topic);
+ROCKETMQCLIENT_API int DestroyMessage(CMessage* msg);
+ROCKETMQCLIENT_API int SetMessageTopic(CMessage* msg, const char* topic);
+ROCKETMQCLIENT_API int SetMessageTags(CMessage* msg, const char* tags);
+ROCKETMQCLIENT_API int SetMessageKeys(CMessage* msg, const char* keys);
+ROCKETMQCLIENT_API int SetMessageBody(CMessage* msg, const char* body);
+ROCKETMQCLIENT_API int SetByteMessageBody(CMessage* msg, const char* body, int len);
+ROCKETMQCLIENT_API int SetMessageProperty(CMessage* msg, const char* key, const char* value);
+ROCKETMQCLIENT_API int SetDelayTimeLevel(CMessage* msg, int level);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_MESSAGE_H__
diff --git a/src/main/c/include/CMessageExt.h b/src/main/c/include/CMessageExt.h
new file mode 100644
index 0000000..acd85e3
--- /dev/null
+++ b/src/main/c/include/CMessageExt.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_EXT_H__
+#define __C_MESSAGE_EXT_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CMessageExt_ _CMessageExt;
+typedef struct CMessageExt CMessageExt;
+
+ROCKETMQCLIENT_API const char* GetMessageTopic(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageTags(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageKeys(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageBody(CMessageExt* msgExt);
+ROCKETMQCLIENT_API const char* GetMessageProperty(CMessageExt* msgExt, const char* key);
+ROCKETMQCLIENT_API const char* GetMessageId(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageDelayTimeLevel(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageQueueId(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageReconsumeTimes(CMessageExt* msgExt);
+ROCKETMQCLIENT_API int GetMessageStoreSize(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageBornTimestamp(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageStoreTimestamp(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageQueueOffset(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessageCommitLogOffset(CMessageExt* msgExt);
+ROCKETMQCLIENT_API long long GetMessagePreparedTransactionOffset(CMessageExt* msgExt);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_MESSAGE_EXT_H__
diff --git a/src/main/c/include/CMessageQueue.h b/src/main/c/include/CMessageQueue.h
new file mode 100644
index 0000000..c514f9d
--- /dev/null
+++ b/src/main/c/include/CMessageQueue.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_MESSAGE_QUEUE_H__
+#define __C_MESSAGE_QUEUE_H__
+
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _CMessageQueue_ {
+  char topic[MAX_TOPIC_LENGTH];
+  char brokerName[MAX_BROKER_NAME_ID_LENGTH];
+  int queueId;
+} CMessageQueue;
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_MESSAGE_H__
diff --git a/src/main/c/include/CProducer.h b/src/main/c/include/CProducer.h
new file mode 100644
index 0000000..6b00575
--- /dev/null
+++ b/src/main/c/include/CProducer.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PRODUCER_H__
+#define __C_PRODUCER_H__
+
+#include "CBatchMessage.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "CMQException.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CProducer_ _CProducer;
+typedef struct CProducer CProducer;
+typedef int (*QueueSelectorCallback)(int size, CMessage* msg, void* arg);
+typedef void (*CSendSuccessCallback)(CSendResult result);
+typedef void (*CSendExceptionCallback)(CMQException e);
+
+ROCKETMQCLIENT_API CProducer* CreateProducer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyProducer(CProducer* producer);
+ROCKETMQCLIENT_API int StartProducer(CProducer* producer);
+ROCKETMQCLIENT_API int ShutdownProducer(CProducer* producer);
+
+ROCKETMQCLIENT_API int SetProducerNameServerAddress(CProducer* producer, const char* namesrv);
+ROCKETMQCLIENT_API int SetProducerNameServerDomain(CProducer* producer, const char* domain);
+ROCKETMQCLIENT_API int SetProducerGroupName(CProducer* producer, const char* groupName);
+ROCKETMQCLIENT_API int SetProducerInstanceName(CProducer* producer, const char* instanceName);
+ROCKETMQCLIENT_API int SetProducerSessionCredentials(CProducer* producer,
+                                                     const char* accessKey,
+                                                     const char* secretKey,
+                                                     const char* onsChannel);
+ROCKETMQCLIENT_API int SetProducerLogPath(CProducer* producer, const char* logPath);
+ROCKETMQCLIENT_API int SetProducerLogFileNumAndSize(CProducer* producer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetProducerLogLevel(CProducer* producer, CLogLevel level);
+ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout);
+ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
+ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
+
+ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
+                                        CMessage* msg,
+                                        CSendSuccessCallback cSendSuccessCallback,
+                                        CSendExceptionCallback cSendExceptionCallback);
+ROCKETMQCLIENT_API int SendMessageOneway(CProducer* producer, CMessage* msg);
+ROCKETMQCLIENT_API int SendMessageOnewayOrderly(CProducer* producer,
+                                                CMessage* msg,
+                                                QueueSelectorCallback selector,
+                                                void* arg);
+ROCKETMQCLIENT_API int SendMessageOrderly(CProducer* producer,
+                                          CMessage* msg,
+                                          QueueSelectorCallback callback,
+                                          void* arg,
+                                          int autoRetryTimes,
+                                          CSendResult* result);
+
+ROCKETMQCLIENT_API int SendMessageOrderlyAsync(CProducer* producer,
+                                               CMessage* msg,
+                                               QueueSelectorCallback callback,
+                                               void* arg,
+                                               CSendSuccessCallback cSendSuccessCallback,
+                                               CSendExceptionCallback cSendExceptionCallback);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_PRODUCER_H__
\ No newline at end of file
diff --git a/src/main/c/include/CPullConsumer.h b/src/main/c/include/CPullConsumer.h
new file mode 100644
index 0000000..0157108
--- /dev/null
+++ b/src/main/c/include/CPullConsumer.h
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PULL_CONSUMER_H__
+#define __C_PULL_CONSUMER_H__
+
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "CMessageQueue.h"
+#include "CPullResult.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CPullConsumer CPullConsumer;
+
+ROCKETMQCLIENT_API CPullConsumer* CreatePullConsumer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int StartPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int ShutdownPullConsumer(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int SetPullConsumerGroupID(CPullConsumer* consumer, const char* groupId);
+ROCKETMQCLIENT_API const char* GetPullConsumerGroupID(CPullConsumer* consumer);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerAddress(CPullConsumer* consumer, const char* namesrv);
+ROCKETMQCLIENT_API int SetPullConsumerNameServerDomain(CPullConsumer* consumer, const char* domain);
+ROCKETMQCLIENT_API int SetPullConsumerSessionCredentials(CPullConsumer* consumer,
+                                                         const char* accessKey,
+                                                         const char* secretKey,
+                                                         const char* channel);
+ROCKETMQCLIENT_API int SetPullConsumerLogPath(CPullConsumer* consumer, const char* logPath);
+ROCKETMQCLIENT_API int SetPullConsumerLogFileNumAndSize(CPullConsumer* consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPullConsumerLogLevel(CPullConsumer* consumer, CLogLevel level);
+
+ROCKETMQCLIENT_API int FetchSubscriptionMessageQueues(CPullConsumer* consumer,
+                                                      const char* topic,
+                                                      CMessageQueue** mqs,
+                                                      int* size);
+ROCKETMQCLIENT_API int ReleaseSubscriptionMessageQueue(CMessageQueue* mqs);
+
+ROCKETMQCLIENT_API CPullResult
+Pull(CPullConsumer* consumer, const CMessageQueue* mq, const char* subExpression, long long offset, int maxNums);
+ROCKETMQCLIENT_API int ReleasePullResult(CPullResult pullResult);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_PUSH_CONSUMER_H__
diff --git a/src/main/c/include/CPullResult.h b/src/main/c/include/CPullResult.h
new file mode 100644
index 0000000..2ab174f
--- /dev/null
+++ b/src/main/c/include/CPullResult.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PULL_RESULT_H__
+#define __C_PULL_RESULT_H__
+
+#include "CCommon.h"
+#include "CMessageExt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+typedef enum E_CPullStatus {
+  E_FOUND,
+  E_NO_NEW_MSG,
+  E_NO_MATCHED_MSG,
+  E_OFFSET_ILLEGAL,
+  E_BROKER_TIMEOUT  // indicate pull request timeout or received NULL response
+} CPullStatus;
+
+typedef struct _CPullResult_ {
+  CPullStatus pullStatus;
+  long long nextBeginOffset;
+  long long minOffset;
+  long long maxOffset;
+  CMessageExt** msgFoundList;
+  int size;
+  void* pData;
+} CPullResult;
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_PULL_RESULT_H__
diff --git a/src/main/c/include/CPushConsumer.h b/src/main/c/include/CPushConsumer.h
new file mode 100644
index 0000000..10b5587
--- /dev/null
+++ b/src/main/c/include/CPushConsumer.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_PUSH_CONSUMER_H__
+#define __C_PUSH_CONSUMER_H__
+
+#include "CMessageExt.h"
+#include "CCommon.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// typedef struct _CConsumer_ _CConsumer;
+typedef struct CPushConsumer CPushConsumer;
+
+typedef enum E_CConsumeStatus { E_CONSUME_SUCCESS = 0, E_RECONSUME_LATER = 1 } CConsumeStatus;
+
+typedef int (*MessageCallBack)(CPushConsumer*, CMessageExt*);
+
+ROCKETMQCLIENT_API CPushConsumer* CreatePushConsumer(const char* groupId);
+ROCKETMQCLIENT_API int DestroyPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int StartPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int ShutdownPushConsumer(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerGroupID(CPushConsumer* consumer, const char* groupId);
+ROCKETMQCLIENT_API const char* GetPushConsumerGroupID(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerAddress(CPushConsumer* consumer, const char* namesrv);
+ROCKETMQCLIENT_API int SetPushConsumerNameServerDomain(CPushConsumer* consumer, const char* domain);
+ROCKETMQCLIENT_API int Subscribe(CPushConsumer* consumer, const char* topic, const char* expression);
+ROCKETMQCLIENT_API int RegisterMessageCallbackOrderly(CPushConsumer* consumer, MessageCallBack pCallback);
+ROCKETMQCLIENT_API int RegisterMessageCallback(CPushConsumer* consumer, MessageCallBack pCallback);
+ROCKETMQCLIENT_API int UnregisterMessageCallbackOrderly(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int UnregisterMessageCallback(CPushConsumer* consumer);
+ROCKETMQCLIENT_API int SetPushConsumerThreadCount(CPushConsumer* consumer, int threadCount);
+ROCKETMQCLIENT_API int SetPushConsumerMessageBatchMaxSize(CPushConsumer* consumer, int batchSize);
+ROCKETMQCLIENT_API int SetPushConsumerInstanceName(CPushConsumer* consumer, const char* instanceName);
+ROCKETMQCLIENT_API int SetPushConsumerSessionCredentials(CPushConsumer* consumer,
+                                                         const char* accessKey,
+                                                         const char* secretKey,
+                                                         const char* channel);
+ROCKETMQCLIENT_API int SetPushConsumerLogPath(CPushConsumer* consumer, const char* logPath);
+ROCKETMQCLIENT_API int SetPushConsumerLogFileNumAndSize(CPushConsumer* consumer, int fileNum, long fileSize);
+ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level);
+ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_PUSH_CONSUMER_H__
diff --git a/src/main/c/include/CSendResult.h b/src/main/c/include/CSendResult.h
new file mode 100644
index 0000000..fa640ca
--- /dev/null
+++ b/src/main/c/include/CSendResult.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_SEND_RESULT_H__
+#define __C_SEND_RESULT_H__
+
+#include "CCommon.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum E_CSendStatus_ {
+  E_SEND_OK = 0,
+  E_SEND_FLUSH_DISK_TIMEOUT = 1,
+  E_SEND_FLUSH_SLAVE_TIMEOUT = 2,
+  E_SEND_SLAVE_NOT_AVAILABLE = 3
+} CSendStatus;
+
+typedef struct _SendResult_ {
+  CSendStatus sendStatus;
+  char msgId[MAX_MESSAGE_ID_LENGTH];
+  long long offset;
+} CSendResult;
+
+#ifdef __cplusplus
+};
+#endif
+#endif  //__C_PRODUCER_H__
diff --git a/src/main/cpp/include/ONSFactory.h b/src/main/cpp/include/ONSFactory.h
index 2ff00c4..566dc48 100755
--- a/src/main/cpp/include/ONSFactory.h
+++ b/src/main/cpp/include/ONSFactory.h
@@ -68,12 +68,16 @@ namespace ons {
 
         const int getSendMsgTimeout() const;
 
+        const int getSuspendTimeMillis() const;
+
         const int getSendMsgRetryTimes() const;
 
         const int getConsumeThreadNums() const;
 
         const int getMaxMsgCacheSize() const;
 
+        const int getMaxMsgCacheSizeInMiB() const;
+
         const ONSChannel getOnsChannel() const;
 
         const char *getChannel() const;
@@ -108,10 +112,12 @@ namespace ons {
         static const char *BROADCASTING;
         static const char *CLUSTERING;
         static const char *SendMsgTimeoutMillis;
+        static const char *SuspendTimeMillis;
         static const char *NAMESRV_ADDR;
         static const char *ConsumeThreadNums;
         static const char *OnsChannel;
         static const char *MaxMsgCacheSize;
+        static const char *MaxCachedMessageSizeInMiB;
         static const char *OnsTraceSwitch;
         static const char *SendMsgRetryTimes;
         static const char *ConsumerInstanceName;
diff --git a/src/main/cpp/sdk/CMakeLists.txt b/src/main/cpp/sdk/CMakeLists.txt
index 5751821..eac4c39 100644
--- a/src/main/cpp/sdk/CMakeLists.txt
+++ b/src/main/cpp/sdk/CMakeLists.txt
@@ -12,6 +12,14 @@ add_library(${LIBRARY_NAME} SHARED
         TransactionProducerImpl.cpp
         common/UtilAll.cpp
         common/Logger.cpp
-        common/ONSClientAbstract.cpp)
+        common/ONSClientAbstract.cpp
+        extern/CBatchMessage.cpp
+        extern/CErrorMessage.cpp
+        extern/CMessage.cpp
+        extern/CMessageExt.cpp
+        extern/CProducer.cpp
+        extern/CPullConsumer.cpp
+        extern/CPushConsumer.cpp
+        extern/CSendResult.cpp)
 target_link_libraries(${LIBRARY_NAME} ${ROCKETMQ_CLIENT_CORE})
 set_target_properties(${LIBRARY_NAME} PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_SOURCE_DIR}/lib VERSION ${CLIENT_VERSION} SOVERSION ${CLIENT_VERSION})
\ No newline at end of file
diff --git a/src/main/cpp/sdk/ONSFactory.cpp b/src/main/cpp/sdk/ONSFactory.cpp
index 662df85..bf1686f 100644
--- a/src/main/cpp/sdk/ONSFactory.cpp
+++ b/src/main/cpp/sdk/ONSFactory.cpp
@@ -39,8 +39,10 @@ namespace ons {
     const char *ONSFactoryProperty::BROADCASTING = "BROADCASTING";
     const char *ONSFactoryProperty::CLUSTERING = "CLUSTERING";
     const char *ONSFactoryProperty::SendMsgTimeoutMillis = "SendMsgTimeoutMillis";
+    const char *ONSFactoryProperty::SuspendTimeMillis = "SuspendTimeMillis";
     const char *ONSFactoryProperty::SendMsgRetryTimes = "SendMsgRetryTimes";
     const char *ONSFactoryProperty::MaxMsgCacheSize = "MaxMsgCacheSize";
+    const char *ONSFactoryProperty::MaxCachedMessageSizeInMiB = "MaxCachedMessageSizeInMiB";
     const char *ONSFactoryProperty::ONSAddr =
             "ONSAddr";  // name server domain name
     const char *ONSFactoryProperty::NAMESRV_ADDR =
@@ -219,7 +221,15 @@ namespace ons {
         if (it != m_onsFactoryProperties.end()) {
             return atoi((*it).second.c_str());
         }
-        return 0;
+        return -1;
+    }
+    const int ONSFactoryProperty::getSuspendTimeMillis() const {
+        map<string, string>::const_iterator it =
+                m_onsFactoryProperties.find(SuspendTimeMillis);
+        if (it != m_onsFactoryProperties.end()) {
+            return atoi((*it).second.c_str());
+        }
+        return -1;
     }
 
     const int ONSFactoryProperty::getSendMsgRetryTimes() const {
@@ -228,7 +238,7 @@ namespace ons {
         if (it != m_onsFactoryProperties.end()) {
             return atoi((*it).second.c_str());
         }
-        return 0;
+        return -1;
     }
 
     const int ONSFactoryProperty::getMaxMsgCacheSize() const {
@@ -238,7 +248,17 @@ namespace ons {
             return atoi((*it).second.c_str());
         }
 
-        return 0;
+        return -1;
+    }
+
+    const int ONSFactoryProperty::getMaxMsgCacheSizeInMiB() const {
+        map<string, string>::const_iterator it =
+                m_onsFactoryProperties.find(MaxCachedMessageSizeInMiB);
+        if (it != m_onsFactoryProperties.end()) {
+            return atoi((*it).second.c_str());
+        }
+
+        return -1;
     }
 
     const ONSChannel ONSFactoryProperty::getOnsChannel() const {
@@ -270,7 +290,7 @@ namespace ons {
             return atoi((*it).second.c_str());
         }
 
-        return 0;
+        return -1;
     }
 
     const char *ONSFactoryProperty::getNameSrvAddr() const {
diff --git a/src/main/cpp/sdk/common/UtilAll.h b/src/main/cpp/sdk/common/UtilAll.h
index dae436b..b49dc02 100644
--- a/src/main/cpp/sdk/common/UtilAll.h
+++ b/src/main/cpp/sdk/common/UtilAll.h
@@ -54,6 +54,10 @@ namespace ons {
             return atoll(str);
         }
 
+        static int str2i(const char *str) {
+            return atoi(str);
+        }
+
         static std::string to_string(const std::map<std::string, std::string> &prop);
 
         static graal_isolate_t *get_isolate() {
diff --git a/src/main/cpp/sdk/extern/CBatchMessage.cpp b/src/main/cpp/sdk/extern/CBatchMessage.cpp
new file mode 100644
index 0000000..8fbedee
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CBatchMessage.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "ONSFactory.h"
+
+using std::vector;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace ons;
+
+CBatchMessage* CreateBatchMessage() {
+  vector<Message>* msgs = new vector<Message>();
+  return (CBatchMessage*)msgs;
+}
+
+int AddMessage(CBatchMessage* batchMsg, CMessage* msg) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  if (batchMsg == NULL) {
+    return NULL_POINTER;
+  }
+  Message* message = (Message*)msg;
+  ((vector<Message>*)batchMsg)->push_back(*message);
+  return OK;
+}
+int DestroyBatchMessage(CBatchMessage* batchMsg) {
+  if (batchMsg == NULL) {
+    return NULL_POINTER;
+  }
+  delete (vector<Message>*)batchMsg;
+  return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CConst.h b/src/main/cpp/sdk/extern/CConst.h
new file mode 100644
index 0000000..a2d510a
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CConst.h
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ROCKETMQ_ONS_CPP_CCONST_H
+#define ROCKETMQ_ONS_CPP_CCONST_H
+
+#endif //ROCKETMQ_ONS_CPP_CCONST_H
+#ifdef __cplusplus
+extern "C" {
+#endif
+#define CAPI_PROPERTY_KEYS "KEYS"
+#define CAPI_PROPERTY_TAGS  "TAGS"
+#define CAPI_PROPERTY_WAIT_STORE_MSG_OK  "WAIT"
+#define CAPI_PROPERTY_DELAY_TIME_LEVEL  "DELAY"
+#define CAPI_PROPERTY_RETRY_TOPIC  "RETRY_TOPIC"
+#define CAPI_PROPERTY_REAL_TOPIC  "REAL_TOPIC"
+#define CAPI_PROPERTY_REAL_QUEUE_ID  "REAL_QID"
+#define CAPI_PROPERTY_TRANSACTION_PREPARED  "TRAN_MSG"
+#define CAPI_PROPERTY_PRODUCER_GROUP  "PGROUP"
+#define CAPI_PROPERTY_MIN_OFFSET  "MIN_OFFSET"
+#define CAPI_PROPERTY_MAX_OFFSET  "MAX_OFFSET"
+#define CAPI_PROPERTY_ORIGIN_MESSAGE_ID  "ORIGIN_MESSAGE_ID"
+#define CAPI_PROPERTY_TRANSFER_FLAG  "TRANSFER_FLAG"
+#define CAPI_PROPERTY_CORRECTION_FLAG  "CORRECTION_FLAG"
+#define CAPI_PROPERTY_MQ2_FLAG  "MQ2_FLAG"
+#define CAPI_PROPERTY_RECONSUME_TIME  "RECONSUME_TIME"
+#define CAPI_PROPERTY_MSG_REGION  "MSG_REGION"
+#define CAPI_PROPERTY_TRACE_SWITCH  "TRACE_ON"
+#define CAPI_PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX  "UNIQ_KEY"
+#define CAPI_PROPERTY_MAX_RECONSUME_TIMES  "MAX_RECONSUME_TIMES"
+#define CAPI_PROPERTY_CONSUME_START_TIMESTAMP  "CONSUME_START_TIME"
+#define CAPI_PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET  "TRAN_PREPARED_QUEUE_OFFSET"
+#define CAPI_PROPERTY_TRANSACTION_CHECK_TIMES  "TRANSACTION_CHECK_TIMES"
+#define CAPI_PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS  "CHECK_IMMUNITY_TIME_IN_SECONDS"
+
+#ifdef __cplusplus
+};
+#endif
\ No newline at end of file
diff --git a/src/main/cpp/sdk/extern/CErrorMessage.cpp b/src/main/cpp/sdk/extern/CErrorMessage.cpp
new file mode 100644
index 0000000..aecd721
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CErrorMessage.cpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CErrorMessage.h"
+#include "CCommon.h"
+#include "ONSFactory.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+const char* GetLatestErrorMessage() {
+  return NULL;
+}
+
+#ifdef __cplusplus
+};
+#endif
\ No newline at end of file
diff --git a/src/main/cpp/sdk/extern/CMessage.cpp b/src/main/cpp/sdk/extern/CMessage.cpp
new file mode 100644
index 0000000..b0400e4
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CMessage.cpp
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CMessage.h"
+#include "CCommon.h"
+#include "ONSFactory.h"
+#include "CConst.h"
+#include "../common/UtilAll.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace ons;
+
+CMessage* CreateMessage(const char* topic) {
+  Message* mqMessage = new Message();
+  if (topic != NULL) {
+    mqMessage->setTopic(topic);
+  }
+  return (CMessage*)mqMessage;
+}
+int DestroyMessage(CMessage* msg) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  delete (Message*)msg;
+  return OK;
+}
+int SetMessageTopic(CMessage* msg, const char* topic) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->setTopic(topic);
+  return OK;
+}
+int SetMessageTags(CMessage* msg, const char* tags) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->setTag(tags);
+  return OK;
+}
+int SetMessageKeys(CMessage* msg, const char* keys) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->setKey(keys);
+  return OK;
+}
+int SetMessageBody(CMessage* msg, const char* body) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->setMsgBody(UtilAll::to_string(body));
+  return OK;
+}
+int SetByteMessageBody(CMessage* msg, const char* body, int len) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->setMsgBody(std::string(body, len));
+  return OK;
+}
+int SetMessageProperty(CMessage* msg, const char* key, const char* value) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->putUserProperties(key, value);
+  return OK;
+}
+int SetDelayTimeLevel(CMessage* msg, int level) {
+  if (msg == NULL) {
+    return NULL_POINTER;
+  }
+  ((Message*)msg)->putUserProperties(CAPI_PROPERTY_DELAY_TIME_LEVEL,std::to_string(level).c_str());
+  return OK;
+}
+int SetStartDeliverTime(CMessage* msg, long long level) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    ((Message*)msg)->setStartDeliverTime(level);
+    return OK;
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CMessageExt.cpp b/src/main/cpp/sdk/extern/CMessageExt.cpp
new file mode 100644
index 0000000..9520554
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CMessageExt.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CMessageExt.h"
+#include "CCommon.h"
+#include "CConst.h"
+#include "ONSFactory.h"
+#include "../common/UtilAll.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+using namespace ons;
+const char *GetMessageTopic(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getTopic();
+}
+const char *GetMessageTags(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getTag();
+}
+const char *GetMessageKeys(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getKey();
+}
+const char *GetMessageBody(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getBody();
+}
+const char *GetMessageProperty(CMessageExt *msg, const char *key) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getUserProperties(key);
+}
+const char *GetMessageId(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL;
+    }
+    return ((Message *) msg)->getMsgID();
+}
+
+int GetMessageDelayTimeLevel(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return atoi(((Message *) msg)->getUserProperties(CAPI_PROPERTY_DELAY_TIME_LEVEL));
+}
+
+int GetMessageQueueId(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return UtilAll::str2i(((Message *) msg)->getUserProperties(CAPI_PROPERTY_REAL_QUEUE_ID));
+}
+
+int GetMessageReconsumeTimes(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((Message *) msg)->getReconsumeTimes();
+}
+
+int GetMessageStoreSize(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return 0;
+}
+
+long long GetMessageBornTimestamp(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return 0;
+}
+
+long long GetMessageStoreTimestamp(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((Message *) msg)->getStoreTimestamp();
+}
+
+long long GetMessageQueueOffset(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return ((Message *) msg)->getQueueOffset();
+}
+
+long long GetMessageCommitLogOffset(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return 0;
+}
+
+long long GetMessagePreparedTransactionOffset(CMessageExt *msg) {
+    if (msg == NULL) {
+        return NULL_POINTER;
+    }
+    return UtilAll::str2ll(((Message *) msg)->getUserProperties(CAPI_PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET));
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CProducer.cpp b/src/main/cpp/sdk/extern/CProducer.cpp
new file mode 100644
index 0000000..3d225c5
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CProducer.cpp
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CProducer.h"
+#include <string.h>
+#include <typeindex>
+#include <string.h>
+#include <typeinfo>
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMQException.h"
+#include "CMessage.h"
+#include "CSendResult.h"
+#include "ONSFactory.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+using namespace ons;
+using namespace std;
+
+class CSendCallback : public SendCallbackONS {
+public:
+    CSendCallback(CSendSuccessCallback cSendSuccessCallback, CSendExceptionCallback cSendExceptionCallback) {
+        m_cSendSuccessCallback = cSendSuccessCallback;
+        m_cSendExceptionCallback = cSendExceptionCallback;
+    }
+
+    virtual ~CSendCallback() {}
+
+    virtual void onSuccess(SendResultONS &sendResult) {
+        CSendResult result;
+        result.sendStatus = E_SEND_OK;
+        result.offset = 0;
+        strncpy(result.msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
+        result.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+        if (m_cSendSuccessCallback != NULL) {
+            m_cSendSuccessCallback(result);
+        }
+    }
+
+    virtual void onException(ONSClientException &e) {
+        CMQException exception;
+        exception.error = e.GetError();
+        exception.line = e.GetError();
+        strncpy(exception.msg, e.GetMsg(), MAX_EXEPTION_MSG_LENGTH - 1);
+        strncpy(exception.file, e.what(), MAX_EXEPTION_FILE_LENGTH - 1);
+        if (m_cSendExceptionCallback != NULL) {
+            m_cSendExceptionCallback(exception);
+        }
+    }
+
+public:
+    void setM_cSendSuccessCallback(CSendSuccessCallback callback) {
+        m_cSendSuccessCallback = callback;
+    }
+
+    void setM_cSendExceptionCallback(CSendExceptionCallback callback) {
+        m_cSendExceptionCallback = callback;
+    }
+
+private:
+    CSendSuccessCallback m_cSendSuccessCallback;
+    CSendExceptionCallback m_cSendExceptionCallback;
+};
+
+typedef struct __DefaultProducer__ {
+    ONSFactoryProperty factoryInfo;
+    Producer *innerProducer;
+    CSendCallback *cSendCallback;
+} DefaultProducer;
+
+CProducer *CreateProducer(const char *groupId) {
+    if (groupId == NULL) {
+        return NULL;
+    }
+    DefaultProducer *defaultMQProducer = new DefaultProducer();
+    defaultMQProducer->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+    defaultMQProducer->cSendCallback = new CSendCallback(NULL, NULL);
+    return (CProducer *) defaultMQProducer;
+}
+int DestroyProducer(CProducer *pProducer) {
+    if (pProducer == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultProducer *defaultProducer = (DefaultProducer *) pProducer;
+    if (defaultProducer->cSendCallback != NULL) {
+        delete defaultProducer->cSendCallback;
+    }
+    delete reinterpret_cast<DefaultProducer *>(pProducer);
+    return OK;
+}
+int StartProducer(CProducer *producer) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+    try {
+        defaultProducer->innerProducer = ONSFactory::getInstance()->createProducer(defaultProducer->factoryInfo);
+        defaultProducer->innerProducer->start();
+    } catch (exception &e) {
+        return PRODUCER_START_FAILED;
+    }
+    return OK;
+}
+int ShutdownProducer(CProducer *producer) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->innerProducer->shutdown();
+    return OK;
+}
+int SetProducerNameServerAddress(CProducer *producer, const char *namesrv) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, namesrv);
+    return OK;
+}
+int SetProducerNameServerDomain(CProducer *producer, const char *domain) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, domain);
+    return OK;
+}
+int SendMessageSync(CProducer *producer, CMessage *msg, CSendResult *result) {
+    // CSendResult sendResult;
+    if (producer == NULL || msg == NULL || result == NULL) {
+        return NULL_POINTER;
+    }
+    try {
+        DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+        Message *message = (Message *) msg;
+        SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
+        result->sendStatus = E_SEND_OK;
+        result->offset = 0;
+        strncpy(result->msgId, sendResult.getMessageId(), MAX_MESSAGE_ID_LENGTH - 1);
+        result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+    } catch (exception &e) {
+        return PRODUCER_SEND_SYNC_FAILED;
+    }
+    return OK;
+}
+
+int SendBatchMessage(CProducer *producer, CBatchMessage *batcMsg, CSendResult *result) {
+    // CSendResult sendResult;
+    if (producer == NULL || batcMsg == NULL || result == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int SendMessageAsync(CProducer *producer,
+                     CMessage *msg,
+                     CSendSuccessCallback cSendSuccessCallback,
+                     CSendExceptionCallback cSendExceptionCallback) {
+    if (producer == NULL || msg == NULL || cSendSuccessCallback == NULL || cSendExceptionCallback == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+    Message *message = (Message *) msg;
+    SendResultONS sendResult = defaultProducer->innerProducer->send(*message);
+
+    try {
+        defaultProducer->cSendCallback->setM_cSendSuccessCallback(cSendSuccessCallback);
+        defaultProducer->cSendCallback->setM_cSendExceptionCallback(cSendExceptionCallback);
+        defaultProducer->innerProducer->sendAsync(*message, defaultProducer->cSendCallback);
+    } catch (exception &e) {
+        if (defaultProducer->cSendCallback != NULL) {
+            if (std::type_index(typeid(e)) == std::type_index(typeid(ONSClientException))) {
+                ONSClientException &mqe = (ONSClientException &) e;
+                defaultProducer->cSendCallback->onException(mqe);
+            }
+        }
+        return PRODUCER_SEND_ASYNC_FAILED;
+    }
+    return OK;
+}
+
+int SendMessageOneway(CProducer *producer, CMessage *msg) {
+    if (producer == NULL || msg == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+    Message *message = (Message *) msg;
+    try {
+        defaultProducer->innerProducer->sendOneway(*message);
+    } catch (exception &e) {
+        return PRODUCER_SEND_ONEWAY_FAILED;
+    }
+    return OK;
+}
+
+int SendMessageOnewayOrderly(CProducer *producer, CMessage *msg, QueueSelectorCallback selector, void *arg) {
+    if (producer == NULL || msg == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultProducer *defaultProducer = (DefaultProducer *) producer;
+    Message *message = (Message *) msg;
+    try {
+        //defaultProducer->innerProducer->sendOneway(*message);
+    } catch (exception &e) {
+        return PRODUCER_SEND_ONEWAY_FAILED;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int SendMessageOrderlyAsync(CProducer *producer,
+                            CMessage *msg,
+                            QueueSelectorCallback callback,
+                            void *arg,
+                            CSendSuccessCallback cSendSuccessCallback,
+                            CSendExceptionCallback cSendExceptionCallback) {
+    if (producer == NULL || msg == NULL || callback == NULL || cSendSuccessCallback == NULL ||
+        cSendExceptionCallback == NULL) {
+        return NULL_POINTER;
+    }
+
+    return NOT_SUPPORT_NOW;
+}
+
+int SendMessageOrderly(CProducer *producer,
+                       CMessage *msg,
+                       QueueSelectorCallback callback,
+                       void *arg,
+                       int autoRetryTimes,
+                       CSendResult *result) {
+    if (producer == NULL || msg == NULL || callback == NULL || arg == NULL || result == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int SetProducerGroupName(CProducer *producer, const char *groupName) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupName);
+    return OK;
+}
+int SetProducerInstanceName(CProducer *producer, const char *instanceName) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumeThreadNums, instanceName);
+    return OK;
+}
+int SetProducerSessionCredentials(CProducer *producer,
+                                  const char *accessKey,
+                                  const char *secretKey,
+                                  const char *onsChannel) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, accessKey);
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, secretKey);
+    return OK;
+}
+int SetProducerLogPath(CProducer *producer, const char *logPath) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, logPath);
+    return OK;
+}
+
+int SetProducerLogFileNumAndSize(CProducer *producer, int fileNum, long fileSize) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+int SetProducerLogLevel(CProducer *producer, CLogLevel level) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    //((DefaultProducer *) producer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, level);
+    return OK;
+}
+
+int SetProducerSendMsgTimeout(CProducer *producer, int timeout) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultProducer *) producer)->factoryInfo.setSendMsgTimeout(timeout);
+    return OK;
+}
+
+int SetProducerCompressMsgBodyOverHowmuch(CProducer *producer, int howmuch) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+int SetProducerCompressLevel(CProducer *producer, int level) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+int SetProducerMaxMessageSize(CProducer *producer, int size) {
+    if (producer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CPullConsumer.cpp b/src/main/cpp/sdk/extern/CPullConsumer.cpp
new file mode 100644
index 0000000..88b9d12
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CPullConsumer.cpp
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CPullConsumer.h"
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "ONSFactory.h"
+
+using namespace ons;
+using namespace std;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+CPullConsumer *CreatePullConsumer(const char *groupId) {
+    if (groupId == NULL) {
+        return NULL;
+    }
+    return NULL;
+}
+int DestroyPullConsumer(CPullConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+int StartPullConsumer(CPullConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+int ShutdownPullConsumer(CPullConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerGroupID(CPullConsumer *consumer, const char *groupId) {
+    if (consumer == NULL || groupId == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+const char *GetPullConsumerGroupID(CPullConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL;
+    }
+    return NULL;
+}
+int SetPullConsumerNameServerAddress(CPullConsumer *consumer, const char *namesrv) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerNameServerDomain(CPullConsumer *consumer, const char *domain) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+int SetPullConsumerSessionCredentials(CPullConsumer *consumer,
+                                      const char *accessKey,
+                                      const char *secretKey,
+                                      const char *channel) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogPath(CPullConsumer *consumer, const char *logPath) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    // Todo, This api should be implemented by core api.
+    //((DefaultMQPullConsumer *) consumer)->setInstanceName(instanceName);
+    return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogFileNumAndSize(CPullConsumer *consumer, int fileNum, long fileSize) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int SetPullConsumerLogLevel(CPullConsumer *consumer, CLogLevel level) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int FetchSubscriptionMessageQueues(CPullConsumer *consumer, const char *topic, CMessageQueue **mqs, int *size) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return PULLCONSUMER_FETCH_MQ_FAILED;
+}
+int ReleaseSubscriptionMessageQueue(CMessageQueue *mqs) {
+    if (mqs == NULL) {
+        return NULL_POINTER;
+    }
+    free((void *) mqs);
+    mqs = NULL;
+    return OK;
+}
+CPullResult Pull(CPullConsumer *consumer,
+                 const CMessageQueue *mq,
+                 const char *subExpression,
+                 long long offset,
+                 int maxNums) {
+    CPullResult pullResult;
+    memset(&pullResult, 0, sizeof(CPullResult));
+    pullResult.pullStatus = E_NO_NEW_MSG;
+    return pullResult;
+}
+int ReleasePullResult(CPullResult pullResult) {
+    if (pullResult.size == 0 || pullResult.msgFoundList == NULL || pullResult.pData == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CPushConsumer.cpp b/src/main/cpp/sdk/extern/CPushConsumer.cpp
new file mode 100644
index 0000000..bc3d788
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CPushConsumer.cpp
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CPushConsumer.h"
+#include <map>
+#include "CCommon.h"
+#include "CMessageExt.h"
+#include "ONSFactory.h"
+#include "../common/UtilAll.h"
+
+using namespace ons;
+using namespace std;
+
+class MessageListenerInner : public MessageListener {
+public:
+    MessageListenerInner() {}
+
+    MessageListenerInner(CPushConsumer *consumer, MessageCallBack pCallback) {
+        m_pconsumer = consumer;
+        m_pMsgReceiveCallback = pCallback;
+    }
+
+    ~MessageListenerInner() {}
+
+    Action consume(Message &message, ConsumeContext &context) {
+        if (m_pMsgReceiveCallback == NULL) {
+            return Action::ReconsumeLater;
+        }
+        CMessageExt *msg = (CMessageExt *) (&message);
+        if (m_pMsgReceiveCallback(m_pconsumer, msg) != E_CONSUME_SUCCESS) {
+            return Action::ReconsumeLater;
+        }
+        return Action::CommitMessage;
+    }
+
+private:
+    MessageCallBack m_pMsgReceiveCallback;
+    CPushConsumer *m_pconsumer;
+};
+
+map<CPushConsumer *, MessageListenerInner *> g_ListenerMap;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+#ifndef CAPI_MAX_SUB_EXPRESS_LEN
+#define CAPI_MAX_SUB_EXPRESS_LEN 512
+#endif
+
+typedef struct __DefaultPushConsumer__ {
+    ONSFactoryProperty factoryInfo;
+    PushConsumer *innerConsumer;
+    char expression[CAPI_MAX_SUB_EXPRESS_LEN];
+} DefaultPushConsumer;
+
+CPushConsumer *CreatePushConsumer(const char *groupId) {
+    if (groupId == NULL) {
+        return NULL;
+    }
+    DefaultPushConsumer *defaultPushConsumer = new DefaultPushConsumer();
+    defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+    memset(defaultPushConsumer->expression, 0, CAPI_MAX_SUB_EXPRESS_LEN);
+    return (CPushConsumer *) defaultPushConsumer;
+}
+int DestroyPushConsumer(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    delete reinterpret_cast<DefaultPushConsumer *>(consumer);
+    return OK;
+}
+int StartPushConsumer(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+    try {
+        defaultPushConsumer->innerConsumer = ONSFactory::getInstance()->createPushConsumer(
+                defaultPushConsumer->factoryInfo);
+        defaultPushConsumer->innerConsumer->start();
+    } catch (exception &e) {
+        return PULLCONSUMER_START_FAILED;
+    }
+    return OK;
+}
+int ShutdownPushConsumer(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+    defaultPushConsumer->innerConsumer->shutdown();
+    return OK;
+}
+int SetPushConsumerGroupID(CPushConsumer *consumer, const char *groupId) {
+    if (consumer == NULL || groupId == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::GroupId, groupId);
+    return OK;
+}
+const char *GetPushConsumerGroupID(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL;
+    }
+    return ((DefaultPushConsumer *) consumer)->factoryInfo.getGroupId();
+}
+int SetPushConsumerNameServerAddress(CPushConsumer *consumer, const char *namesrv) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, namesrv);
+    return OK;
+}
+int SetPushConsumerNameServerDomain(CPushConsumer *consumer, const char *domain) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ONSAddr, domain);
+    return OK;
+}
+int Subscribe(CPushConsumer *consumer, const char *topic, const char *expression) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics, topic);
+    memset(((DefaultPushConsumer *) consumer)->expression, 0, CAPI_MAX_SUB_EXPRESS_LEN);
+    strncpy(((DefaultPushConsumer *) consumer)->expression, expression, CAPI_MAX_SUB_EXPRESS_LEN - 1);
+    return OK;
+}
+
+int RegisterMessageCallback(CPushConsumer *consumer, MessageCallBack pCallback) {
+    if (consumer == NULL || pCallback == NULL) {
+        return NULL_POINTER;
+    }
+
+    DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+    MessageListenerInner *listenerInner = new MessageListenerInner(consumer, pCallback);
+    defaultPushConsumer->innerConsumer->subscribe(defaultPushConsumer->factoryInfo.getPublishTopics(),
+                                                  defaultPushConsumer->expression, listenerInner);
+    g_ListenerMap[consumer] = listenerInner;
+    return OK;
+}
+
+int RegisterMessageCallbackOrderly(CPushConsumer *consumer, MessageCallBack pCallback) {
+    if (consumer == NULL || pCallback == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int UnregisterMessageCallbackOrderly(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return NOT_SUPPORT_NOW;
+}
+
+int UnregisterMessageCallback(CPushConsumer *consumer) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    map<CPushConsumer *, MessageListenerInner *>::iterator iter;
+    iter = g_ListenerMap.find(consumer);
+
+    if (iter != g_ListenerMap.end()) {
+        MessageListenerInner *listenerInner = iter->second;
+        if (listenerInner != NULL) {
+            delete listenerInner;
+        }
+        g_ListenerMap.erase(iter);
+    }
+    return OK;
+}
+
+int SetPushConsumerMessageModel(CPushConsumer *consumer, CMessageModel messageModel) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+
+    DefaultPushConsumer *defaultPushConsumer = (DefaultPushConsumer *) consumer;
+    switch (messageModel) {
+        case BROADCASTING:
+            defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+                                                                ONSFactoryProperty::BROADCASTING);
+            break;
+        case CLUSTERING:
+            defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+                                                                ONSFactoryProperty::CLUSTERING);
+            break;
+        default:
+            defaultPushConsumer->factoryInfo.setFactoryProperty(ONSFactoryProperty::MessageModel,
+                                                                ONSFactoryProperty::CLUSTERING);
+            break;
+    }
+    return OK;
+}
+int SetPushConsumerThreadCount(CPushConsumer *consumer, int threadCount) {
+    if (consumer == NULL || threadCount == 0) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumeThreadNums,
+                                                                       UtilAll::to_string(threadCount).c_str());
+    return OK;
+}
+int SetPushConsumerMessageBatchMaxSize(CPushConsumer *consumer, int batchSize) {
+    if (consumer == NULL || batchSize == 0) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+int SetPushConsumerInstanceName(CPushConsumer *consumer, const char *instanceName) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerInstanceName,
+                                                                       instanceName);
+    return OK;
+}
+
+int SetPushConsumerSessionCredentials(CPushConsumer *consumer,
+                                      const char *accessKey,
+                                      const char *secretKey,
+                                      const char *channel) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, accessKey);
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, secretKey);
+    return OK;
+}
+
+int SetPushConsumerLogPath(CPushConsumer *consumer, const char *logPath) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    ((DefaultPushConsumer *) consumer)->factoryInfo.setFactoryProperty(ONSFactoryProperty::LogPath, logPath);
+    return OK;
+}
+
+int SetPushConsumerLogFileNumAndSize(CPushConsumer *consumer, int fileNum, long fileSize) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+int SetPushConsumerLogLevel(CPushConsumer *consumer, CLogLevel level) {
+    if (consumer == NULL) {
+        return NULL_POINTER;
+    }
+    return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/main/cpp/sdk/extern/CSendResult.cpp b/src/main/cpp/sdk/extern/CSendResult.cpp
new file mode 100644
index 0000000..8d4dc2b
--- /dev/null
+++ b/src/main/cpp/sdk/extern/CSendResult.cpp
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CSendResult.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifdef __cplusplus
+};
+#endif