You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/06 22:42:21 UTC

[GitHub] sijie closed pull request #1736: C API wrapper based on C++ client library

sijie closed pull request #1736: C API wrapper based on C++ client library
URL: https://github.com/apache/incubator-pulsar/pull/1736
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index c660cd10d7..e14b081c80 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -35,9 +35,13 @@ lib*.so*
 
 # Exclude compiled executables
 /examples/SampleProducer
+/examples/SampleProducerCApi
 /examples/SampleConsumer
+/examples/SampleConsumerCApi
 /examples/SampleAsyncProducer
 /examples/SampleConsumerListener
+/examples/SampleConsumerListenerCApi
+/examples/SampleReaderCApi
 /tests/main
 /perf/perfProducer
 /perf/perfConsumer
diff --git a/pulsar-client-cpp/examples/CMakeLists.txt b/pulsar-client-cpp/examples/CMakeLists.txt
index fe5eb6ec04..6459b601ac 100644
--- a/pulsar-client-cpp/examples/CMakeLists.txt
+++ b/pulsar-client-cpp/examples/CMakeLists.txt
@@ -33,12 +33,36 @@ set(SAMPLE_PRODUCER_SOURCES
   SampleProducer.cc
 )
 
+set(SAMPLE_PRODUCER_C_SOURCES
+    SampleProducerCApi.c
+)
+
+set(SAMPLE_CONSUMER_C_SOURCES
+    SampleConsumerCApi.c
+)
+
+set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
+    SampleConsumerListenerCApi.c
+)
+
+set(SAMPLE_READER_C_SOURCES
+        SampleReaderCApi.c
+)
+
 add_executable(SampleAsyncProducer    ${SAMPLE_ASYNC_PRODUCER_SOURCES})
 add_executable(SampleConsumer         ${SAMPLE_CONSUMER_SOURCES})
 add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
 add_executable(SampleProducer         ${SAMPLE_PRODUCER_SOURCES})
+add_executable(SampleProducerCApi         ${SAMPLE_PRODUCER_C_SOURCES})
+add_executable(SampleConsumerCApi         ${SAMPLE_CONSUMER_C_SOURCES})
+add_executable(SampleConsumerListenerCApi         ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
+add_executable(SampleReaderCApi         ${SAMPLE_READER_C_SOURCES})
 
 target_link_libraries(SampleAsyncProducer    ${CLIENT_LIBS})
 target_link_libraries(SampleConsumer         ${CLIENT_LIBS})
 target_link_libraries(SampleConsumerListener ${CLIENT_LIBS})
 target_link_libraries(SampleProducer         ${CLIENT_LIBS})
+target_link_libraries(SampleProducerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleConsumerListenerCApi     ${CLIENT_LIBS})
+target_link_libraries(SampleReaderCApi     ${CLIENT_LIBS})
\ No newline at end of file
diff --git a/pulsar-client-cpp/examples/SampleConsumerCApi.c b/pulsar-client-cpp/examples/SampleConsumerCApi.c
new file mode 100644
index 0000000000..97b074d5ef
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerCApi.c
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", conf);
+
+    pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
+    pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared);
+
+    pulsar_consumer_t *consumer;
+    pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create subscribe to topic: %s\n", pulsar_result_str(res));
+        return 1;
+    }
+
+    for (;;) {
+        pulsar_message_t *message;
+        res = pulsar_consumer_receive(consumer, &message);
+        if (res != pulsar_result_Ok) {
+            printf("Failed to receive message: %s\n", pulsar_result_str(res));
+            return 1;
+        }
+
+        printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
+               pulsar_message_get_data(message));
+
+        pulsar_consumer_acknowledge(consumer, message);
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_consumer_close(consumer);
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
new file mode 100644
index 0000000000..8f3ed0df63
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message) {
+    printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
+           pulsar_message_get_data(message));
+
+    pulsar_consumer_acknowledge(consumer, message);
+    pulsar_message_free(message);
+}
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", conf);
+
+    pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
+    pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared);
+    pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback);
+
+    pulsar_consumer_t *consumer;
+    pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create subscribe to topic: %s\n", pulsar_result_str(res));
+        return 1;
+    }
+
+    // Block main thread
+    fgetc(stdin);
+
+    printf("\nClosing consumer\n");
+
+    // Cleanup
+    pulsar_consumer_close(consumer);
+    pulsar_consumer_free(consumer);
+    pulsar_consumer_configuration_free(consumer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleProducerCApi.c b/pulsar-client-cpp/examples/SampleProducerCApi.c
new file mode 100644
index 0000000000..74853e2d13
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleProducerCApi.c
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <stdio.h>
+#include <string.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", conf);
+
+    pulsar_producer_configuration_t* producer_conf = pulsar_producer_configuration_create();
+    pulsar_producer_configuration_set_batching_enabled(producer_conf, 1);
+    pulsar_producer_t *producer;
+
+    pulsar_result err = pulsar_client_create_producer(client, "my-topic", producer_conf, &producer);
+    if (err != pulsar_result_Ok) {
+        printf("Failed to create producer: %s\n", pulsar_result_str(err));
+        return 1;
+    }
+
+    for (int i = 0; i < 10; i++) {
+        const char* data = "my-content";
+        pulsar_message_t* message = pulsar_message_create();
+        pulsar_message_set_content(message, data, strlen(data));
+
+        err = pulsar_producer_send(producer, message);
+        if (err == pulsar_result_Ok) {
+            printf("Sent message %d\n", i);
+        } else {
+            printf("Failed to publish message: %s\n", pulsar_result_str(err));
+            return 1;
+        }
+
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_producer_close(producer);
+    pulsar_producer_free(producer);
+    pulsar_producer_configuration_free(producer_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/examples/SampleReaderCApi.c b/pulsar-client-cpp/examples/SampleReaderCApi.c
new file mode 100644
index 0000000000..c0eec06333
--- /dev/null
+++ b/pulsar-client-cpp/examples/SampleReaderCApi.c
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <stdio.h>
+#include <pulsar/c/client.h>
+
+int main() {
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create("pulsar://localhost:6650", conf);
+
+    pulsar_reader_configuration_t *reader_conf = pulsar_reader_configuration_create();
+
+    pulsar_reader_t *reader;
+    pulsar_result res = pulsar_client_create_reader(client, "my-topic", pulsar_message_id_earliest(), reader_conf,
+                                                    &reader);
+    if (res != pulsar_result_Ok) {
+        printf("Failed to create reader: %s\n", pulsar_result_str(res));
+        return 1;
+    }
+
+    for (;;) {
+        pulsar_message_t *message;
+        res = pulsar_reader_read_next(reader, &message);
+        if (res != pulsar_result_Ok) {
+            printf("Failed to read message: %s\n", pulsar_result_str(res));
+            return 1;
+        }
+
+        printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
+               pulsar_message_get_data(message));
+
+        pulsar_message_free(message);
+    }
+
+    // Cleanup
+    pulsar_reader_close(reader);
+    pulsar_reader_free(reader);
+    pulsar_reader_configuration_free(reader_conf);
+
+    pulsar_client_close(client);
+    pulsar_client_free(client);
+    pulsar_client_configuration_free(conf);
+}
diff --git a/pulsar-client-cpp/include/pulsar/c/authentication.h b/pulsar-client-cpp/include/pulsar/c/authentication.h
new file mode 100644
index 0000000000..6f405c2469
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/authentication.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_authentication_t *pulsar_authentication_create(const char *dynamicLibPath,
+                                                      const char *authParamsString);
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
new file mode 100644
index 0000000000..11320e7882
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/client_configuration.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/message_id.h>
+#include <pulsar/c/producer.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/reader.h>
+#include <pulsar/c/consumer_configuration.h>
+#include <pulsar/c/producer_configuration.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/c/result.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client pulsar_client_t;
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer);
+
+typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader);
+
+typedef void (*pulsar_close_callback)(pulsar_result result);
+
+/**
+ * Create a Pulsar client object connecting to the specified cluster address and using the specified
+ * configuration.
+ *
+ * @param serviceUrl the Pulsar endpoint to use (eg: pulsar://broker-example.com:6650)
+ * @param clientConfiguration the client configuration to use
+ */
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+                                      const pulsar_client_configuration_t *clientConfiguration);
+
+/**
+ * Create a producer with default configuration
+ *
+ * @see createProducer(const std::string&, const ProducerConfiguration&, Producer&)
+ *
+ * @param topic the topic where the new producer will publish
+ * @param producer a non-const reference where the new producer will be copied
+ * @return ResultOk if the producer has been successfully created
+ * @return ResultError if there was an error
+ */
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char *topic,
+                                            const pulsar_producer_configuration_t *conf,
+                                            pulsar_producer_t **producer);
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
+                                         const pulsar_producer_configuration_t *conf,
+                                         pulsar_create_producer_callback callback);
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
+                                      const char *subscriptionName,
+                                      const pulsar_consumer_configuration_t *conf,
+                                      pulsar_consumer_t **consumer);
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
+                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
+                                   pulsar_subscribe_callback callback);
+
+/**
+ * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
+ * topic.
+ * <p>
+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without
+ * using a
+ * subscription. Reader can only work on non-partitioned topics.
+ * <p>
+ * The initial reader positioning is done by specifying a message id. The options are:
+ * <ul>
+ * <li><code>MessageId.earliest</code> : Start reading from the earliest message available in the topic
+ * <li><code>MessageId.latest</code> : Start reading from the end topic, only getting messages published
+ * after the
+ * reader was created
+ * <li><code>MessageId</code> : When passing a particular message id, the reader will position itself on
+ * that
+ * specific position. The first message to be read will be the message next to the specified messageId.
+ * </ul>
+ *
+ * @param topic
+ *            The name of the topic where to read
+ * @param startMessageId
+ *            The message id where the reader will position itself. The first message returned will be the
+ * one after
+ *            the specified startMessageId
+ * @param conf
+ *            The {@code ReaderConfiguration} object
+ * @return The {@code Reader} object
+ */
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
+                                          const pulsar_message_id_t *startMessageId,
+                                          pulsar_reader_configuration_t *conf, pulsar_reader_t **reader);
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
+                                       const pulsar_message_id_t *startMessageId,
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
+                                       pulsar_reader_callback callback);
+
+pulsar_result pulsar_client_close(pulsar_client_t *client);
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback);
+
+void pulsar_client_free(pulsar_client_t *client);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/client_configuration.h b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
new file mode 100644
index 0000000000..47f78dbbf8
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/client_configuration.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
+typedef struct _pulsar_authentication pulsar_authentication_t;
+
+pulsar_client_configuration_t *pulsar_client_configuration_create();
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf);
+
+/**
+ * Set the authentication method to be used with the broker
+ *
+ * @param authentication the authentication data to use
+ */
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+                                          pulsar_authentication_t *authentication);
+
+/**
+ * Set timeout on client operations (subscribe, create producer, close, unsubscribe)
+ * Default is 30 seconds.
+ *
+ * @param timeout the timeout after which the operation will be considered as failed
+ */
+void pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t *conf,
+                                                               int timeout);
+
+/**
+ * @return the client operations timeout in seconds
+ */
+int pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t *conf);
+
+/**
+ * Set the number of IO threads to be used by the Pulsar client. Default is 1
+ * thread.
+ *
+ * @param threads number of threads
+ */
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t *conf, int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t *conf);
+
+/**
+ * Set the number of threads to be used by the Pulsar client when delivering messages
+ * through message listener. Default is 1 thread per Pulsar client.
+ *
+ * If using more than 1 thread, messages for distinct MessageListener will be
+ * delivered in different threads, however a single MessageListener will always
+ * be assigned to the same thread.
+ *
+ * @param threads number of threads
+ */
+void pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t *conf,
+                                                              int threads);
+
+/**
+ * @return the number of IO threads to use
+ */
+int pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t *conf);
+
+/**
+ * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
+ * produce/subscribe on
+ * thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+void pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t *conf,
+                                                               int concurrentLookupRequest);
+
+/**
+ * @return Get configured total allowed concurrent lookup-request.
+ */
+int pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t *conf);
+
+/**
+ * Initialize the log configuration
+ *
+ * @param logConfFilePath  path of the configuration file
+ */
+void pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t *conf,
+                                                        const char *logConfFilePath);
+
+/**
+ * Get the path of log configuration file (log4cpp)
+ */
+const char *pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t *conf);
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls);
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf);
+
+void pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t *conf,
+                                                               const char *tlsTrustCertsFilePath);
+
+const char *pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t *conf);
+
+void pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t *conf,
+                                                                   int allowInsecure);
+
+int pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t *conf);
+
+/*
+ * Initialize stats interval in seconds. Stats are printed and reset after every 'statsIntervalInSeconds'.
+ * Set to 0 in order to disable stats collection.
+ */
+void pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t *conf,
+                                                               const unsigned int interval);
+
+/*
+ * Get the stats interval set in the client.
+ */
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+    pulsar_client_configuration_t *conf);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
new file mode 100644
index 0000000000..59c99e3080
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_consumer pulsar_consumer_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this consumer is subscribed to
+ */
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer);
+
+/**
+ * @return the consumer name
+ */
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer);
+
+/**
+ * Unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @see asyncUnsubscribe
+ * @return Result::ResultOk if the unsubscribe operation completed successfully
+ * @return Result::ResultError if the unsubscribe operation failed
+ */
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
+
+/**
+ * Asynchronously unsubscribe the current consumer from the topic.
+ *
+ * This method will block until the operation is completed. Once the consumer is
+ * unsubscribed, no more messages will be received and subsequent new messages
+ * will not be retained for this consumer.
+ *
+ * This consumer object cannot be reused.
+ *
+ * @param callback the callback to get notified when the operation is complete
+ */
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+
+/**
+ * Receive a single message.
+ *
+ * If a message is not immediately available, this method will block until a new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg);
+
+/**
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer, pulsar_message_t **msg,
+                                                   int timeoutMs);
+
+/**
+ * Acknowledge the reception of a single message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the message will not be re-delivered to this consumer.
+ *
+ * @see asyncAcknowledge
+ * @param message the message to acknowledge
+ * @return ResultOk if the message was successfully acknowledged
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId);
+
+/**
+ * Asynchronously acknowledge the reception of a single message.
+ *
+ * This method will initiate the operation and return immediately. The provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been acknowledged
+ */
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
+                                       pulsar_result_callback callback);
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
+                                          pulsar_result_callback callback);
+
+/**
+ * Acknowledge the reception of all the messages in the stream up to (and including)
+ * the provided message.
+ *
+ * This method will block until an acknowledgement is sent to the broker. After
+ * that, the messages will not be re-delivered to this consumer.
+ *
+ * Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
+ *
+ * It's equivalent to calling asyncAcknowledgeCumulative(const Message&, ResultCallback) and
+ * waiting for the callback to be triggered.
+ *
+ * @param message the last message in the stream to acknowledge
+ * @return ResultOk if the message was successfully acknowledged. All previously delivered messages for
+ * this topic are also acknowledged.
+ * @return ResultError if there was a failure
+ */
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message);
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consumer,
+                                                        pulsar_message_id_t *messageId);
+
+/**
+ * Asynchronously acknowledge the reception of all the messages in the stream up to (and
+ * including) the provided message.
+ *
+ * This method will initiate the operation and return immediately. The provided callback
+ * will be triggered when the operation is complete.
+ *
+ * @param message the message to acknowledge
+ * @param callback callback that will be triggered when the message has been acknowledged
+ */
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
+                                                  pulsar_result_callback callback);
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
+                                                     pulsar_message_id_t *messageId,
+                                                     pulsar_result_callback callback);
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer);
+
+/*
+ * Pause receiving messages via the messageListener, till resumeMessageListener() is called.
+ */
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t *consumer);
+
+/*
+ * Resume receiving the messages via the messageListener.
+ * Asynchronously receive all the messages enqueued from time pauseMessageListener() was called.
+ */
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
+
+/**
+ * Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is
+ * not
+ * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed
+ * across all
+ * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the
+ * connection
+ * breaks, the messages are redelivered after reconnect.
+ */
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
new file mode 100644
index 0000000000..3bd95715b5
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t;
+
+typedef enum {
+    /**
+     * There can be only 1 consumer on the same topic with the same consumerName
+     */
+    pulsar_ConsumerExclusive,
+
+    /**
+     * Multiple consumers will be able to use the same consumerName and the messages
+     *  will be dispatched according to a round-robin rotation between the connected consumers
+     */
+    pulsar_ConsumerShared,
+
+    /** Only one consumer is active on the subscription; Subscription can have N consumers
+     *  connected one of which will get promoted to master if the current master becomes inactive
+     */
+    pulsar_ConsumerFailover
+} pulsar_consumer_type;
+
+/// Callback definition for MessageListener
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg);
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * Specify the consumer type. The consumer type enables
+ * specifying the type of subscription. In Exclusive subscription,
+ * only a single consumer is allowed to attach to the subscription. Other consumers
+ * will get an error message. In Shared subscription, multiple consumers will be
+ * able to use the same subscription name and the messages will be dispatched in a
+ * round robin fashion. In Failover subscription, a primary-failover subscription model
+ * allows for multiple consumers to attach to a single subscription, though only one
+ * of them will be “master” at a given time. Only the primary consumer will receive
+ * messages. When the primary consumer gets disconnected, one among the failover
+ * consumers will be promoted to primary and will start getting messages.
+ */
+void pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t *consumer_configuration,
+                                                     pulsar_consumer_type consumerType);
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * and acknowledge messages delivered. A listener will be called in order
+ * for every message received.
+ */
+void pulsar_consumer_configuration_set_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener);
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration,
+                                         pulsar_consumer_t *consumer);
+
+/**
+ * Sets the size of the consumer receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by the Consumer before the
+ * application calls receive(). Using a higher value could potentially increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ *            the new receiver queue size value
+ */
+void pulsar_consumer_configuration_set_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration, int size);
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * Set the max total receiver queue size across partitons.
+ * <p>
+ * This setting will be used to reduce the receiver queue size for individual partitions
+ * {@link #setReceiverQueueSize(int)} if the total exceeds this value (default: 50000).
+ *
+ * @param maxTotalReceiverQueueSizeAcrossPartitions
+ */
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration, int maxTotalReceiverQueueSizeAcrossPartitions);
+
+/**
+ * @return the configured max total receiver queue size across partitions
+ */
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t *consumer_configuration,
+                                       const char *consumerName);
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t *consumer_configuration);
+
+/**
+ * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
+ * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
+ * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
+ * redelivered.
+ * @param timeout in milliseconds
+ */
+void pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration,
+                                                     const uint64_t milliSeconds);
+
+/**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+long pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration);
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoKeyReader(CryptoKeyReaderPtr
+// cryptoKeyReader);
+//
+// ConsumerCryptoFailureAction getCryptoFailureAction()
+//
+// const;
+// ConsumerConfiguration&
+// setCryptoFailureAction(ConsumerCryptoFailureAction
+// action);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
new file mode 100644
index 0000000000..9645b02ed7
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message pulsar_message_t;
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+pulsar_message_t *pulsar_message_create();
+void pulsar_message_free(pulsar_message_t *message);
+
+/// Builder
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data, size_t size);
+
+/**
+ * Set content of the message to a buffer already allocated by the caller. No copies of
+ * this buffer will be made. The caller is responsible to ensure the memory buffer is
+ * valid until the message has been persisted (or an error is returned).
+ */
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void *data, size_t size);
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name, const char *value);
+
+/*
+ * set partition key for the message routing
+ * @param hash of this key is used to determine message's topic partition
+ */
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char *partitionKey);
+
+/**
+ * Set the event timestamp for the message.
+ */
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t eventTimestamp);
+
+/**
+ * Specify a custom sequence id for the message being published.
+ * <p>
+ * The sequence id can be used for deduplication purposes and it needs to follow these rules:
+ * <ol>
+ * <li><code>sequenceId >= 0</code>
+ * <li>Sequence id for a message needs to be greater than sequence id for earlier messages:
+ * <code>sequenceId(N+1) > sequenceId(N)</code>
+ * <li>It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the
+ * <code>sequenceId</code> could represent an offset or a cumulative size.
+ * </ol>
+ *
+ * @param sequenceId
+ *            the sequence id to assign to the current message
+ */
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t sequenceId);
+
+/**
+ * override namespace replication clusters.  note that it is the
+ * caller's responsibility to provide valid cluster names, and that
+ * all clusters have been previously configured as topics.
+ *
+ * given an empty list, the message will replicate per the namespace
+ * configuration.
+ *
+ * @param clusters where to send this message.
+ */
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const char **clusters);
+
+/**
+ * Do not replicate this message
+ * @param flag if true, disable replication, otherwise use default
+ * replication
+ */
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
+
+/// Accessor for built messages
+
+/**
+ * Return the properties attached to the message.
+ * Properties are application defined key/value pairs that will be attached to the message
+ *
+ * @return an unmodifiable view of the properties map
+ */
+// const StringMap& getProperties() const;
+
+/**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+int pulsar_message_has_property(pulsar_message_t *message, const char *name);
+
+/**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+const char *pulsar_message_get_property(pulsar_message_t *message, const char *name);
+
+/**
+ * Get the content of the message
+ *
+ *
+ * @return the pointer to the message payload
+ */
+const void *pulsar_message_get_data(pulsar_message_t *message);
+
+/**
+ * Get the length of the message
+ *
+ * @return the length of the message payload
+ */
+uint32_t pulsar_message_get_length(pulsar_message_t *message);
+
+/**
+ * Get the unique message ID associated with this message.
+ *
+ * The message id can be used to univocally refer to a message without having to keep the entire payload
+ * in memory.
+ *
+ * Only messages received from the consumer will have a message id assigned.
+ *
+ */
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message);
+
+/**
+ * Get the partition key for this message
+ * @return key string that is hashed to determine message's topic partition
+ */
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message);
+int pulsar_message_has_partition_key(pulsar_message_t *message);
+
+/**
+ * Get the UTC based timestamp in milliseconds referring to when the message was published by the client
+ * producer
+ */
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message);
+
+/**
+ * Get the event timestamp associated with this message. It is set by the client producer.
+ */
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h b/pulsar-client-cpp/include/pulsar/c/message_id.h
new file mode 100644
index 0000000000..a0eb684625
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stddef.h>
+#include <stdint.h>
+
+typedef struct _pulsar_message_id pulsar_message_id_t;
+
+/**
+ * MessageId representing the "earliest" or "oldest available" message stored in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_earliest();
+
+/**
+ * MessageId representing the "latest" or "last published" message in the topic
+ */
+const pulsar_message_id_t *pulsar_message_id_latest();
+
+/**
+ * Serialize the message id into a binary string for storing
+ */
+const void *pulsar_message_id_serialize(int *len);
+
+/**
+ * Deserialize a message id from a binary string
+ */
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h b/pulsar-client-cpp/include/pulsar/c/message_router.h
new file mode 100644
index 0000000000..aea4188e57
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
+
+typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata);
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h b/pulsar-client-cpp/include/pulsar/c/producer.h
new file mode 100644
index 0000000000..6121e06653
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#include <stdint.h>
+
+typedef struct _pulsar_producer pulsar_producer_t;
+
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
+typedef void (*pulsar_close_callback)(pulsar_result);
+
+/**
+ * @return the topic to which producer is publishing to
+ */
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer);
+
+/**
+ * @return the producer name which could have been assigned by the system or specified by the client
+ */
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer);
+
+/**
+ * Publish a message on the topic associated with this Producer.
+ *
+ * This method will block until the message will be accepted and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the sendTimeout,
+ * an error will be returned.
+ *
+ * This method is equivalent to asyncSend() and wait until the callback is triggered.
+ *
+ * @param msg message to publish
+ * @return ResultOk if the message was published successfully
+ * @return ResultWriteError if it wasn't possible to publish the message
+ */
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t *msg);
+
+/**
+ * Asynchronously publish a message on the topic associated with this Producer.
+ *
+ * This method will initiate the publish operation and return immediately. The
+ * provided callback will be triggered when the message has been be accepted and persisted
+ * by the broker. In case of errors, the client library will try to
+ * automatically recover and use a different broker.
+ *
+ * If it wasn't possible to successfully publish the message within the sendTimeout, the
+ * callback will be triggered with a Result::WriteError code.
+ *
+ * @param msg message to publish
+ * @param callback the callback to get notification of the completion
+ */
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
+                                pulsar_send_callback callback);
+
+/**
+ * Get the last sequence id that was published by this producer.
+ *
+ * This represent either the automatically assigned or custom sequence id (set on the MessageBuilder) that
+ * was published and acknowledged by the broker.
+ *
+ * After recreating a producer with the same producer name, this will return the last message that was
+ * published in
+ * the previous producer session, or -1 if there no message was ever published.
+ *
+ * @return the last sequence id published by this producer
+ */
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. Waits until
+ * all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ *
+ * @return an error code to indicate the success or failure
+ */
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
+
+/**
+ * Close the producer and release resources allocated.
+ *
+ * No more writes will be accepted from this producer. The provided callback will be
+ * triggered when all pending write requests are persisted. In case of errors,
+ * pending writes will not be retried.
+ */
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback);
+
+void pulsar_producer_free(pulsar_producer_t *producer);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
new file mode 100644
index 0000000000..534c411e05
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <stdint.h>
+
+#include <pulsar/c/message_router.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+    pulsar_UseSinglePartition,
+    pulsar_RoundRobinDistribution,
+    pulsar_CustomPartition
+} pulsar_partitions_routing_mode;
+
+typedef enum { pulsar_Murmur3_32Hash, pulsar_BoostHash, pulsar_JavaStringHash } pulsar_hashing_scheme;
+
+typedef enum {
+    pulsar_CompressionNone = 0,
+    pulsar_CompressionLZ4 = 1,
+    pulsar_CompressionZLib = 2
+} pulsar_compression_type;
+
+typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create();
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t *conf,
+                                                     const char *producerName);
+
+const char *pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t *conf, int sendTimeoutMs);
+
+int pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t *conf,
+                                                           int64_t initialSequenceId);
+
+int64_t pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t *conf,
+                                                        pulsar_compression_type compressionType);
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+    pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
+                                                            int maxPendingMessages);
+
+int pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t *conf);
+
+/**
+ * Set the number of max pending messages across all the partitions
+ * <p>
+ * This setting will be used to lower the max pending messages for each partition
+ * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
+ *
+ * @param maxPendingMessagesAcrossPartitions
+ */
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf, int maxPendingMessagesAcrossPartitions);
+
+/**
+ *
+ * @return the maximum number of pending messages allowed across all the partitions
+ */
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t *conf,
+                                                               pulsar_partitions_routing_mode mode);
+
+pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_routing_mode(
+    pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
+                                                      pulsar_message_router router);
+
+void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf,
+                                                      pulsar_hashing_scheme scheme);
+
+pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
+                                                           int blockIfQueueFull);
+
+int pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t *conf);
+
+// Zero queue size feature will not be supported on consumer end if batching is enabled
+void pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t *conf,
+                                                        int batchingEnabled);
+
+int pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t *conf,
+                                                             unsigned int batchingMaxMessages);
+
+unsigned int pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf, unsigned long batchingMaxAllowedSizeInBytes);
+
+unsigned long pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf);
+
+void pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_producer_configuration_t *conf,
+                                                                     unsigned long batchingMaxPublishDelayMs);
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf);
+
+// const CryptoKeyReaderPtr getCryptoKeyReader() const;
+// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+//
+// ProducerCryptoFailureAction getCryptoFailureAction() const;
+// ProducerConfiguration &setCryptoFailureAction(ProducerCryptoFailureAction action);
+//
+// std::set <std::string> &getEncryptionKeys();
+// int isEncryptionEnabled() const;
+// ProducerConfiguration &addEncryptionKey(std::string key);
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h b/pulsar-client-cpp/include/pulsar/c/reader.h
new file mode 100644
index 0000000000..648c172f94
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/c/message.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader pulsar_reader_t;
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+/**
+ * @return the topic this reader is reading from
+ */
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader);
+
+/**
+ * Read a single message.
+ *
+ * If a message is not immediately available, this method will block until a new
+ * message is available.
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @return ResultOk when a message is received
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pulsar_message_t **msg);
+
+/**
+ * Read a single message
+ *
+ * @param msg a non-const reference where the received message will be copied
+ * @param timeoutMs the receive timeout in milliseconds
+ * @return ResultOk if a message was received
+ * @return ResultTimeout if the receive timeout was triggered
+ * @return ResultInvalidConfiguration if a message listener had been set in the configuration
+ */
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, pulsar_message_t **msg,
+                                                   int timeoutMs);
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback);
+
+void pulsar_reader_free(pulsar_reader_t *reader);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
new file mode 100644
index 0000000000..914bbd9ede
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
+
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg);
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create();
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration);
+
+/**
+ * A message listener enables your application to configure how to process
+ * messages. A listener will be called in order for every message received.
+ */
+void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
+                                                     pulsar_reader_listener listener);
+
+int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration);
+
+/**
+ * Sets the size of the reader receive queue.
+ *
+ * The consumer receive queue controls how many messages can be accumulated by the Consumer before the
+ * application calls receive(). Using a higher value could potentially increase the consumer throughput
+ * at the expense of bigger memory utilization.
+ *
+ * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling
+ * pre-fetching of
+ * messages. This approach improves the message distribution on shared subscription, by pushing messages
+ * only to
+ * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can
+ * be
+ * used if the consumer queue size is zero. The receive() function call should not be interrupted when
+ * the consumer queue size is zero.
+ *
+ * Default value is 1000 messages and should be good for most use cases.
+ *
+ * @param size
+ *            the new receiver queue size value
+ */
+void pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t *configuration,
+                                                         int size);
+
+int pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t *configuration);
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t *configuration,
+                                                 const char *readerName);
+
+const char *pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t *configuration);
+
+void pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t *configuration,
+                                                              const char *subscriptionRolePrefix);
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+    pulsar_reader_configuration_t *configuration);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/include/pulsar/c/result.h b/pulsar-client-cpp/include/pulsar/c/result.h
new file mode 100644
index 0000000000..0ca769db91
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/c/result.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef enum {
+    pulsar_result_Ok,  /// Operation successful
+
+    pulsar_result_UnknownError,  /// Unknown error happened on broker
+
+    pulsar_result_InvalidConfiguration,  /// Invalid configuration
+
+    pulsar_result_Timeout,       /// Operation timed out
+    pulsar_result_LookupError,   /// Broker lookup failed
+    pulsar_result_ConnectError,  /// Failed to connect to broker
+    pulsar_result_ReadError,     /// Failed to read from socket
+
+    pulsar_result_AuthenticationError,             /// Authentication failed on broker
+    pulsar_result_AuthorizationError,              /// Client is not authorized to create producer/consumer
+    pulsar_result_ErrorGettingAuthenticationData,  /// Client cannot find authorization data
+
+    pulsar_result_BrokerMetadataError,     /// Broker failed in updating metadata
+    pulsar_result_BrokerPersistenceError,  /// Broker failed to persist entry
+    pulsar_result_ChecksumError,           /// Corrupt message checksum failure
+
+    pulsar_result_ConsumerBusy,   /// Exclusive consumer is already connected
+    pulsar_result_NotConnected,   /// Producer/Consumer is not currently connected to broker
+    pulsar_result_AlreadyClosed,  /// Producer/Consumer is already closed and not accepting any operation
+
+    pulsar_result_InvalidMessage,  /// Error in publishing an already used message
+
+    pulsar_result_ConsumerNotInitialized,         /// Consumer is not initialized
+    pulsar_result_ProducerNotInitialized,         /// Producer is not initialized
+    pulsar_result_TooManyLookupRequestException,  /// Too Many concurrent LookupRequest
+
+    pulsar_result_InvalidTopicName,  /// Invalid topic name
+    pulsar_result_InvalidUrl,        /// Client Initialized with Invalid Broker Url (VIP Url passed to Client
+                                     /// Constructor)
+    pulsar_result_ServiceUnitNotReady,  /// Service Unit unloaded between client did lookup and
+                                        /// producer/consumer got created
+
+    pulsar_result_OperationNotSupported,
+    pulsar_result_ProducerBlockedQuotaExceededError,      /// Producer is blocked
+    pulsar_result_ProducerBlockedQuotaExceededException,  /// Producer is getting exception
+    pulsar_result_ProducerQueueIsFull,                    /// Producer queue is full
+    pulsar_result_MessageTooBig,                          /// Trying to send a messages exceeding the max size
+    pulsar_result_TopicNotFound,                          /// Topic not found
+    pulsar_result_SubscriptionNotFound,                   /// Subscription not found
+    pulsar_result_ConsumerNotFound,                       /// Consumer not found
+    pulsar_result_UnsupportedVersionError,  /// Error when an older client/version doesn't support a required
+                                            /// feature
+    pulsar_result_TopicTerminated,          /// Topic was already terminated
+    pulsar_result_CryptoError               /// Error when crypto operation fails
+} pulsar_result;
+
+// Return string representation of result code
+const char *pulsar_result_str(pulsar_result result);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/pulsar-client-cpp/lib/CMakeLists.txt b/pulsar-client-cpp/lib/CMakeLists.txt
index 052de8b48c..74dbc08dcd 100644
--- a/pulsar-client-cpp/lib/CMakeLists.txt
+++ b/pulsar-client-cpp/lib/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc)
+file(GLOB PULSAR_SOURCES *.cc lz4/*.c checksum/*.cc stats/*.cc c/*.cc)
 
 execute_process(COMMAND cat ../pom.xml COMMAND xmllint --format - COMMAND sed "s/xmlns=\".*\"//g" COMMAND xmllint --stream --pattern /project/version --debug - COMMAND grep -A 2 "matches pattern" COMMAND grep text COMMAND sed "s/.* [0-9] //g" OUTPUT_STRIP_TRAILING_WHITESPACE OUTPUT_VARIABLE PV)
 set (CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -D_PULSAR_VERSION_=\\\"${PV}\\\"")
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc b/pulsar-client-cpp/lib/c/c_Authentication.cc
new file mode 100644
index 0000000000..5cd2a64e00
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/authentication.h>
+
+#include <pulsar/Authentication.h>
+
+#include "c_structs.h"
+
+pulsar_authentication_t *pulsar_authentication_create(const char *dynamicLibPath,
+                                                      const char *authParamsString) {
+    pulsar_authentication_t *authentication = new pulsar_authentication_t;
+    authentication->auth = pulsar::AuthFactory::create(dynamicLibPath, authParamsString);
+    return authentication;
+}
+
+void pulsar_authentication_free(pulsar_authentication_t *authentication) { delete authentication; }
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
new file mode 100644
index 0000000000..cec7a13e46
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client.h>
+
+#include <boost/bind.hpp>
+
+#include "c_structs.h"
+
+pulsar_client_t *pulsar_client_create(const char *serviceUrl,
+                                      const pulsar_client_configuration_t *clientConfiguration) {
+    pulsar_client_t *c_client = new pulsar_client_t;
+    c_client->client.reset(new pulsar::Client(std::string(serviceUrl)));
+    return c_client;
+}
+
+void pulsar_client_free(pulsar_client_t *client) { delete client; }
+
+pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char *topic,
+                                            const pulsar_producer_configuration_t *conf,
+                                            pulsar_producer_t **c_producer) {
+    pulsar::Producer producer;
+    pulsar::Result res = client->client->createProducer(topic, conf->conf, producer);
+    if (res == pulsar::ResultOk) {
+        (*c_producer) = new pulsar_producer_t;
+        (*c_producer)->producer = producer;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_create_producer_callback(pulsar::Result result, pulsar::Producer producer,
+                                            pulsar_create_producer_callback callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_producer_t *c_producer = new pulsar_producer_t;
+        c_producer->producer = producer;
+        callback(pulsar_result_Ok, c_producer);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
+                                         const pulsar_producer_configuration_t *conf,
+                                         pulsar_create_producer_callback callback) {
+    client->client->createProducerAsync(topic, conf->conf,
+                                        boost::bind(&handle_create_producer_callback, _1, _2, callback));
+}
+
+pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
+                                      const char *subscriptionName,
+                                      const pulsar_consumer_configuration_t *conf,
+                                      pulsar_consumer_t **c_consumer) {
+    pulsar::Consumer consumer;
+    pulsar::Result res =
+        client->client->subscribe(topic, subscriptionName, conf->consumerConfiguration, consumer);
+    if (res == pulsar::ResultOk) {
+        (*c_consumer) = new pulsar_consumer_t;
+        (*c_consumer)->consumer = consumer;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer consumer,
+                                      pulsar_subscribe_callback callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
+        c_consumer->consumer = consumer;
+        callback(pulsar_result_Ok, c_consumer);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
+                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
+                                   pulsar_subscribe_callback callback) {
+    client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, _2, callback));
+}
+
+pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
+                                          const pulsar_message_id_t *startMessageId,
+                                          pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) {
+    pulsar::Reader reader;
+    pulsar::Result res = client->client->createReader(topic, startMessageId->messageId, conf->conf, reader);
+    if (res == pulsar::ResultOk) {
+        (*c_reader) = new pulsar_reader_t;
+        (*c_reader)->reader = reader;
+        return pulsar_result_Ok;
+    } else {
+        return (pulsar_result)res;
+    }
+}
+
+static void handle_reader_callback(pulsar::Result result, pulsar::Reader reader,
+                                   pulsar_reader_callback callback) {
+    if (result == pulsar::ResultOk) {
+        pulsar_reader_t *c_reader = new pulsar_reader_t;
+        c_reader->reader = reader;
+        callback(pulsar_result_Ok, c_reader);
+    } else {
+        callback((pulsar_result)result, NULL);
+    }
+}
+
+void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
+                                       const pulsar_message_id_t *startMessageId,
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
+                                       pulsar_reader_callback callback) {
+    client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf,
+                                      boost::bind(&handle_reader_callback, _1, _2, callback));
+}
+
+pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); }
+
+static void handle_client_close(pulsar::Result result, pulsar_close_callback callback) {
+    callback((pulsar_result)result);
+}
+
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback) {
+    client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
new file mode 100644
index 0000000000..8caf8e87f1
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/client_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_client_configuration_t *pulsar_client_configuration_create() {
+    pulsar_client_configuration_t *c_conf = new pulsar_client_configuration_t;
+    c_conf->conf = pulsar::ClientConfiguration();
+    return c_conf;
+}
+
+void pulsar_client_configuration_free(pulsar_client_configuration_t *conf) { delete conf; }
+
+void pulsar_client_configuration_set_auth(pulsar_client_configuration_t *conf,
+                                          pulsar_authentication_t *authentication) {
+    conf->conf.setAuth(authentication->auth);
+}
+
+void pulsar_client_configuration_set_operation_timeout_seconds(pulsar_client_configuration_t *conf,
+                                                               int timeout) {
+    conf->conf.setOperationTimeoutSeconds(timeout);
+}
+
+int pulsar_client_configuration_get_operation_timeout_seconds(pulsar_client_configuration_t *conf) {
+    return conf->conf.getOperationTimeoutSeconds();
+}
+
+void pulsar_client_configuration_set_io_threads(pulsar_client_configuration_t *conf, int threads) {
+    conf->conf.setIOThreads(threads);
+}
+
+int pulsar_client_configuration_get_io_threads(pulsar_client_configuration_t *conf) {
+    return conf->conf.getIOThreads();
+}
+
+void pulsar_client_configuration_set_message_listener_threads(pulsar_client_configuration_t *conf,
+                                                              int threads) {
+    conf->conf.setMessageListenerThreads(threads);
+}
+
+int pulsar_client_configuration_get_message_listener_threads(pulsar_client_configuration_t *conf) {
+    return conf->conf.getMessageListenerThreads();
+}
+
+void pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_configuration_t *conf,
+                                                               int concurrentLookupRequest) {
+    conf->conf.setConcurrentLookupRequest(concurrentLookupRequest);
+}
+
+int pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t *conf) {
+    return conf->conf.getConcurrentLookupRequest();
+}
+
+void pulsar_client_configuration_set_log_conf_file_path(pulsar_client_configuration_t *conf,
+                                                        const char *logConfFilePath) {
+    conf->conf.setLogConfFilePath(logConfFilePath);
+}
+
+const char *pulsar_client_configuration_get_log_conf_file_path(pulsar_client_configuration_t *conf) {
+    return conf->conf.getLogConfFilePath().c_str();
+}
+
+void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls) {
+    conf->conf.setUseTls(useTls);
+}
+
+int pulsar_client_configuration_is_use_tls(pulsar_client_configuration_t *conf) {
+    return conf->conf.isUseTls();
+}
+
+void pulsar_client_configuration_set_tls_trust_certs_file_path(pulsar_client_configuration_t *conf,
+                                                               const char *tlsTrustCertsFilePath) {
+    conf->conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
+}
+
+const char *pulsar_client_configuration_get_tls_trust_certs_file_path(pulsar_client_configuration_t *conf) {
+    return conf->conf.getTlsTrustCertsFilePath().c_str();
+}
+
+void pulsar_client_configuration_set_tls_allow_insecure_connection(pulsar_client_configuration_t *conf,
+                                                                   int allowInsecure) {
+    conf->conf.setTlsAllowInsecureConnection(allowInsecure);
+}
+
+int pulsar_client_configuration_is_tls_allow_insecure_connection(pulsar_client_configuration_t *conf) {
+    return conf->conf.isTlsAllowInsecureConnection();
+}
+
+void pulsar_client_configuration_set_stats_interval_in_seconds(pulsar_client_configuration_t *conf,
+                                                               const unsigned int interval) {
+    conf->conf.setStatsIntervalInSeconds(interval);
+}
+
+const unsigned int pulsar_client_configuration_get_stats_interval_in_seconds(
+    pulsar_client_configuration_t *conf) {
+    return conf->conf.getStatsIntervalInSeconds();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
new file mode 100644
index 0000000000..22021a5146
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_consumer_get_topic(pulsar_consumer_t *consumer) {
+    return consumer->consumer.getTopic().c_str();
+}
+
+const char *pulsar_consumer_get_subscription_name(pulsar_consumer_t *consumer) {
+    return consumer->consumer.getSubscriptionName().c_str();
+}
+
+pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.unsubscribe();
+}
+
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
+    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback));
+}
+
+pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) {
+    pulsar::Message message;
+    pulsar::Result res = consumer->consumer.receive(message);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer, pulsar_message_t **msg,
+                                                   int timeoutMs) {
+    pulsar::Message message;
+    pulsar::Result res = consumer->consumer.receive(message, timeoutMs);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_consumer_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message) {
+    return (pulsar_result)consumer->consumer.acknowledge(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId) {
+    return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
+                                       pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeAsync(message->message, boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
+                                          pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeAsync(messageId->messageId,
+                                        boost::bind(handle_result_callback, _1, callback));
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) {
+    return (pulsar_result)consumer->consumer.acknowledgeCumulative(message->message);
+}
+
+pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consumer,
+                                                        pulsar_message_id_t *messageId) {
+    return (pulsar_result)consumer->consumer.acknowledge(messageId->messageId);
+}
+
+void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
+                                                  pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeCumulativeAsync(message->message,
+                                                  boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
+                                                     pulsar_message_id_t *messageId,
+                                                     pulsar_result_callback callback) {
+    consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
+                                                  boost::bind(handle_result_callback, _1, callback));
+}
+
+pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.close();
+}
+
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
+    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
+
+pulsar_result pulsar_consumer_pause_message_listener(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.pauseMessageListener();
+}
+
+pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
+    return (pulsar_result)consumer->consumer.resumeMessageListener();
+}
+
+void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+    return consumer->consumer.redeliverUnacknowledgedMessages();
+}
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
new file mode 100644
index 0000000000..dcd0aaccb8
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/consumer_configuration.h>
+
+#include "c_structs.h"
+
+pulsar_consumer_configuration_t *pulsar_consumer_configuration_create() {
+    return new pulsar_consumer_configuration_t;
+}
+
+void pulsar_consumer_configuration_free(pulsar_consumer_configuration_t *consumer_configuration) {
+    delete consumer_configuration;
+}
+
+void pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configuration_t *consumer_configuration,
+                                                     pulsar_consumer_type consumerType) {
+    consumer_configuration->consumerConfiguration.setConsumerType((pulsar::ConsumerType)consumerType);
+}
+
+pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
+}
+
+static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
+                                      pulsar_message_listener listener) {
+    pulsar_consumer_t c_consumer;
+    c_consumer.consumer = consumer;
+    pulsar_message_t *message = new pulsar_message_t;
+    message->message = msg;
+    listener(&c_consumer, message);
+}
+
+void pulsar_consumer_configuration_set_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener) {
+    consumer_configuration->consumerConfiguration.setMessageListener(
+        boost::bind(message_listener_callback, _1, _2, messageListener));
+}
+
+int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.hasMessageListener();
+}
+
+void pulsar_consumer_configuration_set_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration, int size) {
+    consumer_configuration->consumerConfiguration.setReceiverQueueSize(size);
+}
+
+int pulsar_consumer_configuration_get_receiver_queue_size(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getReceiverQueueSize();
+}
+
+void pulsar_consumer_set_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration, int maxTotalReceiverQueueSizeAcrossPartitions) {
+    consumer_configuration->consumerConfiguration.setMaxTotalReceiverQueueSizeAcrossPartitions(
+        maxTotalReceiverQueueSizeAcrossPartitions);
+}
+
+int pulsar_consumer_get_max_total_receiver_queue_size_across_partitions(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getMaxTotalReceiverQueueSizeAcrossPartitions();
+}
+
+void pulsar_consumer_set_consumer_name(pulsar_consumer_configuration_t *consumer_configuration,
+                                       const char *consumerName) {
+    consumer_configuration->consumerConfiguration.setConsumerName(consumerName);
+}
+
+const char *pulsar_consumer_get_consumer_name(pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getConsumerName().c_str();
+}
+
+void pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration,
+                                                     const uint64_t milliSeconds) {
+    consumer_configuration->consumerConfiguration.setUnAckedMessagesTimeoutMs(milliSeconds);
+}
+
+long pulsar_consumer_get_unacked_messages_timeout_ms(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getUnAckedMessagesTimeoutMs();
+}
+
+int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.isEncryptionEnabled();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Message.cc b/pulsar-client-cpp/lib/c/c_Message.cc
new file mode 100644
index 0000000000..d87560e374
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Message.cc
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message.h>
+#include "c_structs.h"
+
+pulsar_message_t *pulsar_message_create() { return new pulsar_message_t; }
+
+void pulsar_message_free(pulsar_message_t *message) { delete message; }
+
+void pulsar_message_set_content(pulsar_message_t *message, const void *data, size_t size) {
+    message->builder.setContent(data, size);
+}
+
+void pulsar_message_set_allocated_content(pulsar_message_t *message, void *data, size_t size) {
+    message->builder.setAllocatedContent(data, size);
+}
+
+void pulsar_message_set_property(pulsar_message_t *message, const char *name, const char *value) {
+    message->builder.setProperty(name, value);
+}
+
+void pulsar_message_set_partition_key(pulsar_message_t *message, const char *partitionKey) {
+    message->builder.setPartitionKey(partitionKey);
+}
+
+void pulsar_message_set_event_timestamp(pulsar_message_t *message, uint64_t eventTimestamp) {
+    message->builder.setEventTimestamp(eventTimestamp);
+}
+
+void pulsar_message_set_sequence_id(pulsar_message_t *message, int64_t sequenceId) {
+    message->builder.setSequenceId(sequenceId);
+}
+
+void pulsar_message_set_replication_clusters(pulsar_message_t *message, const char **clusters) {
+    const char *c = clusters[0];
+    std::vector<std::string> clustersList;
+    while (c) {
+        clustersList.push_back(c);
+        ++c;
+    }
+
+    message->builder.setReplicationClusters(clustersList);
+}
+
+void pulsar_message_disable_replication(pulsar_message_t *message, int flag) {
+    message->builder.disableReplication(flag);
+}
+
+int pulsar_message_has_property(pulsar_message_t *message, const char *name) {
+    return message->message.hasProperty(name);
+}
+
+const char *pulsar_message_get_property(pulsar_message_t *message, const char *name) {
+    return message->message.getProperty(name).c_str();
+}
+
+const void *pulsar_message_get_data(pulsar_message_t *message) { return message->message.getData(); }
+
+uint32_t pulsar_message_get_length(pulsar_message_t *message) { return message->message.getLength(); }
+
+pulsar_message_id_t *pulsar_message_get_message_id(pulsar_message_t *message) {
+    pulsar_message_id_t *messageId = new pulsar_message_id_t;
+    messageId->messageId = message->message.getMessageId();
+    return messageId;
+}
+
+const char *pulsar_message_get_partitionKey(pulsar_message_t *message) {
+    return message->message.getPartitionKey().c_str();
+}
+
+int pulsar_message_has_partition_key(pulsar_message_t *message) { return message->message.hasPartitionKey(); }
+
+uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
+    return message->message.getPublishTimestamp();
+}
+
+uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message) {
+    return message->message.getEventTimestamp();
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc b/pulsar-client-cpp/lib/c/c_MessageId.cc
new file mode 100644
index 0000000000..572835987b
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_id.h>
+#include "c_structs.h"
+
+#include <boost/thread/once.hpp>
+
+boost::once_flag initialized = BOOST_ONCE_INIT;
+
+static pulsar_message_id_t earliest;
+static pulsar_message_id_t latest;
+
+static void initialize() {
+    earliest.messageId = pulsar::MessageId::earliest();
+    latest.messageId = pulsar::MessageId::latest();
+}
+
+const pulsar_message_id_t *pulsar_message_id_earliest() {
+    boost::call_once(&initialize, initialized);
+    return &earliest;
+}
+
+const pulsar_message_id_t *pulsar_message_id_latest() {
+    boost::call_once(&initialize, initialized);
+    return &latest;
+}
+
+const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
+    std::string str;
+    messageId->messageId.serialize(str);
+    void *p = malloc(str.length());
+    memcpy(p, str.c_str(), str.length());
+    return p;
+}
+
+pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len) {
+    std::string strId((const char *)buffer, len);
+    pulsar_message_id_t *messageId = new pulsar_message_id_t;
+    messageId->messageId = pulsar::MessageId::deserialize(strId);
+    return messageId;
+}
diff --git a/pulsar-client-cpp/lib/c/c_MessageRouter.cc b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
new file mode 100644
index 0000000000..3184357e9a
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_MessageRouter.cc
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/message_router.h>
+
+#include "c_structs.h"
+
+int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata) {
+    return topicMetadata->metadata->getNumPartitions();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc
new file mode 100644
index 0000000000..1de670c7fc
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <boost/bind.hpp>
+
+#include <pulsar/c/producer.h>
+
+#include "c_structs.h"
+
+const char *pulsar_producer_get_topic(pulsar_producer_t *producer) {
+    return producer->producer.getTopic().c_str();
+}
+
+const char *pulsar_producer_get_producer_name(pulsar_producer_t *producer) {
+    return producer->producer.getProducerName().c_str();
+}
+
+void pulsar_producer_free(pulsar_producer_t *producer) { delete producer; }
+
+pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t *msg) {
+    msg->message = msg->builder.build();
+    return (pulsar_result)producer->producer.send(msg->message);
+}
+
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
+                                 pulsar_send_callback callback) {
+    callback((pulsar_result)result, msg);
+}
+
+void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
+                                pulsar_send_callback callback) {
+    msg->message = msg->builder.build();
+    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback));
+}
+
+int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
+    return producer->producer.getLastSequenceId();
+}
+
+pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
+    return (pulsar_result)producer->producer.close();
+}
+
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback) {
+    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
new file mode 100644
index 0000000000..8cc9345b6b
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/c/producer_configuration.h>
+#include <include/pulsar/c/message.h>
+
+#include "c_structs.h"
+
+pulsar_producer_configuration_t *pulsar_producer_configuration_create() {
+    pulsar_producer_configuration_t *c_conf = new pulsar_producer_configuration_t;
+    c_conf->conf = pulsar::ProducerConfiguration();
+    return c_conf;
+}
+
+void pulsar_producer_configuration_free(pulsar_producer_configuration_t *conf) { delete conf; }
+
+void pulsar_producer_configuration_set_producer_name(pulsar_producer_configuration_t *conf,
+                                                     const char *producerName) {
+    conf->conf.setProducerName(producerName);
+}
+
+const char *pulsar_producer_configuration_get_producer_name(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getProducerName().c_str();
+}
+
+void pulsar_producer_configuration_set_send_timeout(pulsar_producer_configuration_t *conf,
+                                                    int sendTimeoutMs) {
+    conf->conf.setSendTimeout(sendTimeoutMs);
+}
+
+int pulsar_producer_configuration_get_send_timeout(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getSendTimeout();
+}
+
+void pulsar_producer_configuration_set_initial_sequence_id(pulsar_producer_configuration_t *conf,
+                                                           int64_t initialSequenceId) {
+    conf->conf.setInitialSequenceId(initialSequenceId);
+}
+
+int64_t pulsar_producer_configuration_get_initial_sequence_id(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getInitialSequenceId();
+}
+
+void pulsar_producer_configuration_set_compression_type(pulsar_producer_configuration_t *conf,
+                                                        pulsar_compression_type compressionType) {
+    conf->conf.setCompressionType((pulsar::CompressionType)compressionType);
+}
+
+pulsar_compression_type pulsar_producer_configuration_get_compression_type(
+    pulsar_producer_configuration_t *conf) {
+    return (pulsar_compression_type)conf->conf.getCompressionType();
+}
+
+void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
+                                                            int maxPendingMessages) {
+    conf->conf.setMaxPendingMessages(maxPendingMessages);
+}
+
+int pulsar_producer_configuration_get_max_pending_messages(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getMaxPendingMessages();
+}
+
+void pulsar_producer_configuration_set_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf, int maxPendingMessagesAcrossPartitions) {
+    conf->conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
+}
+
+int pulsar_producer_configuration_get_max_pending_messages_across_partitions(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getMaxPendingMessagesAcrossPartitions();
+}
+
+void pulsar_producer_configuration_set_partitions_routing_mode(pulsar_producer_configuration_t *conf,
+                                                               pulsar_partitions_routing_mode mode) {
+    conf->conf.setPartitionsRoutingMode((pulsar::ProducerConfiguration::PartitionsRoutingMode)mode);
+}
+
+pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_routing_mode(
+    pulsar_producer_configuration_t *conf) {
+    return (pulsar_partitions_routing_mode)conf->conf.getPartitionsRoutingMode();
+}
+
+void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf,
+                                                      pulsar_hashing_scheme scheme) {
+    conf->conf.setHashingScheme((pulsar::ProducerConfiguration::HashingScheme)scheme);
+}
+
+pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
+    pulsar_producer_configuration_t *conf) {
+    return (pulsar_hashing_scheme)conf->conf.getHashingScheme();
+}
+
+class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
+    pulsar_message_router _router;
+
+   public:
+    MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+
+    int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata &topicMetadata) {
+        pulsar_message_t message;
+        message.message = msg;
+
+        pulsar_topic_metadata_t metadata;
+        metadata.metadata = &topicMetadata;
+
+        return _router(&message, &metadata);
+    }
+};
+
+void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
+                                                      pulsar_message_router router) {
+    conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+}
+
+void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
+                                                           int blockIfQueueFull) {
+    conf->conf.setBlockIfQueueFull(blockIfQueueFull);
+}
+
+int pulsar_producer_configuration_get_block_if_queue_full(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBlockIfQueueFull();
+}
+
+void pulsar_producer_configuration_set_batching_enabled(pulsar_producer_configuration_t *conf,
+                                                        int batchingEnabled) {
+    conf->conf.setBatchingEnabled(batchingEnabled);
+}
+
+int pulsar_producer_configuration_get_batching_enabled(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingEnabled();
+}
+
+void pulsar_producer_configuration_set_batching_max_messages(pulsar_producer_configuration_t *conf,
+                                                             unsigned int batchingMaxMessages) {
+    conf->conf.setBatchingMaxMessages(batchingMaxMessages);
+}
+
+unsigned int pulsar_producer_configuration_get_batching_max_messages(pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingMaxMessages();
+}
+
+void pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf, unsigned long batchingMaxAllowedSizeInBytes) {
+    conf->conf.setBatchingMaxAllowedSizeInBytes(batchingMaxAllowedSizeInBytes);
+}
+
+unsigned long pulsar_producer_configuration_get_batching_max_allowed_size_in_bytes(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingMaxAllowedSizeInBytes();
+}
+
+void pulsar_producer_configuration_set_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf, unsigned long batchingMaxPublishDelayMs) {
+    conf->conf.setBatchingMaxPublishDelayMs(batchingMaxPublishDelayMs);
+}
+
+unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getBatchingMaxPublishDelayMs();
+}
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc
new file mode 100644
index 0000000000..bb5d69bbdc
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+const char *pulsar_reader_get_topic(pulsar_reader_t *reader) { return reader->reader.getTopic().c_str(); }
+
+pulsar_result pulsar_reader_read_next(pulsar_reader_t *reader, pulsar_message_t **msg) {
+    pulsar::Message message;
+    pulsar::Result res = reader->reader.readNext(message);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, pulsar_message_t **msg,
+                                                   int timeoutMs) {
+    pulsar::Message message;
+    pulsar::Result res = reader->reader.readNext(message, timeoutMs);
+    if (res == pulsar::ResultOk) {
+        (*msg) = new pulsar_message_t;
+        (*msg)->message = message;
+    }
+    return (pulsar_result)res;
+}
+
+pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
+
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback) {
+    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback));
+}
+
+void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
new file mode 100644
index 0000000000..b6419a6002
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/reader.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/reader_configuration.h>
+#include <pulsar/ReaderConfiguration.h>
+#include <pulsar/Reader.h>
+
+#include "c_structs.h"
+
+pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
+    return new pulsar_reader_configuration_t;
+}
+
+void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration) { delete configuration; }
+
+static void message_listener_callback(pulsar::Reader reader, const pulsar::Message &msg,
+                                      pulsar_reader_listener listener) {
+    pulsar_reader_t c_reader;
+    c_reader.reader = reader;
+    pulsar_message_t *message = new pulsar_message_t;
+    message->message = msg;
+    listener(&c_reader, message);
+}
+
+void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
+                                                     pulsar_reader_listener listener) {
+    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener));
+}
+
+int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) {
+    return configuration->conf.hasReaderListener();
+}
+
+void pulsar_reader_configuration_set_receiver_queue_size(pulsar_reader_configuration_t *configuration,
+                                                         int size) {
+    configuration->conf.setReceiverQueueSize(size);
+}
+
+int pulsar_reader_configuration_get_receiver_queue_size(pulsar_reader_configuration_t *configuration) {
+    return configuration->conf.getReceiverQueueSize();
+}
+
+void pulsar_reader_configuration_set_reader_name(pulsar_reader_configuration_t *configuration,
+                                                 const char *readerName) {
+    configuration->conf.setReaderName(readerName);
+}
+
+const char *pulsar_reader_configuration_get_reader_name(pulsar_reader_configuration_t *configuration) {
+    return configuration->conf.getReaderName().c_str();
+}
+
+void pulsar_reader_configuration_set_subscription_role_prefix(pulsar_reader_configuration_t *configuration,
+                                                              const char *subscriptionRolePrefix) {
+    configuration->conf.setSubscriptionRolePrefix(subscriptionRolePrefix);
+}
+
+const char *pulsar_reader_configuration_get_subscription_role_prefix(
+    pulsar_reader_configuration_t *configuration) {
+    return configuration->conf.getSubscriptionRolePrefix().c_str();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Result.cc b/pulsar-client-cpp/lib/c/c_Result.cc
new file mode 100644
index 0000000000..157a91f33a
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_Result.cc
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/c/result.h>
+#include <pulsar/Result.h>
+
+const char *pulsar_result_str(pulsar_result result) { return pulsar::strResult((pulsar::Result)result); }
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
new file mode 100644
index 0000000000..b207ab4fbf
--- /dev/null
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <pulsar/c/result.h>
+#include <pulsar/Client.h>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+struct _pulsar_client {
+    boost::scoped_ptr<pulsar::Client> client;
+};
+
+struct _pulsar_client_configuration {
+    pulsar::ClientConfiguration conf;
+};
+
+struct _pulsar_producer {
+    pulsar::Producer producer;
+};
+
+struct _pulsar_producer_configuration {
+    pulsar::ProducerConfiguration conf;
+};
+
+struct _pulsar_consumer {
+    pulsar::Consumer consumer;
+};
+
+struct _pulsar_consumer_configuration {
+    pulsar::ConsumerConfiguration consumerConfiguration;
+};
+
+struct _pulsar_reader {
+    pulsar::Reader reader;
+};
+
+struct _pulsar_reader_configuration {
+    pulsar::ReaderConfiguration conf;
+};
+
+struct _pulsar_message {
+    pulsar::MessageBuilder builder;
+    pulsar::Message message;
+};
+
+struct _pulsar_message_id {
+    pulsar::MessageId messageId;
+};
+
+struct _pulsar_authentication {
+    pulsar::AuthenticationPtr auth;
+};
+
+struct _pulsar_topic_metadata {
+    const pulsar::TopicMetadata* metadata;
+};
+
+typedef void (*pulsar_result_callback)(pulsar_result);
+
+static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback) {
+    callback((pulsar_result)result);
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services