You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/10 23:01:15 UTC

[incubator-pulsar] branch master updated: In C API add context pointer to callbacks (#1761)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 40a5aa5  In C API add context pointer to callbacks (#1761)
40a5aa5 is described below

commit 40a5aa5366c6e8c1ed172bfc5fcf7e1749717703
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 10 16:01:12 2018 -0700

    In C API add context pointer to callbacks (#1761)
---
 .../examples/SampleConsumerListenerCApi.c          |  4 +--
 pulsar-client-cpp/include/pulsar/c/client.h        | 20 +++++------
 pulsar-client-cpp/include/pulsar/c/consumer.h      | 17 ++++-----
 .../include/pulsar/c/consumer_configuration.h      |  9 ++---
 pulsar-client-cpp/include/pulsar/c/message_id.h    |  6 +++-
 .../include/pulsar/c/message_router.h              |  3 +-
 pulsar-client-cpp/include/pulsar/c/producer.h      |  8 ++---
 .../include/pulsar/c/producer_configuration.h      |  2 +-
 pulsar-client-cpp/include/pulsar/c/reader.h        |  4 +--
 .../include/pulsar/c/reader_configuration.h        |  4 +--
 pulsar-client-cpp/lib/c/c_Client.cc                | 42 +++++++++++-----------
 pulsar-client-cpp/lib/c/c_Consumer.cc              | 28 ++++++++-------
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 12 ++++---
 pulsar-client-cpp/lib/c/c_MessageId.cc             | 13 ++++++-
 pulsar-client-cpp/lib/c/c_Producer.cc              | 14 ++++----
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |  9 ++---
 pulsar-client-cpp/lib/c/c_Reader.cc                |  4 +--
 pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc   |  8 ++---
 pulsar-client-cpp/lib/c/c_structs.h                |  8 +++--
 19 files changed, 120 insertions(+), 95 deletions(-)

diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
index 8f3ed0d..e75c5d5 100644
--- a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -20,7 +20,7 @@
 #include <stdio.h>
 #include <pulsar/c/client.h>
 
-static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message) {
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message, void* ctx) {
     printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
            pulsar_message_get_data(message));
 
@@ -34,7 +34,7 @@ int main() {
 
     pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
     pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared);
-    pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback);
+    pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback, NULL);
 
     pulsar_consumer_t *consumer;
     pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer);
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 11320e7..b8ef1ed 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -40,12 +40,12 @@ typedef struct _pulsar_producer pulsar_producer_t;
 typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
 typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
 
-typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer);
+typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer, void *ctx);
 
-typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer);
-typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader);
+typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx);
 
-typedef void (*pulsar_close_callback)(pulsar_result result);
+typedef void (*pulsar_close_callback)(pulsar_result result, void *ctx);
 
 /**
  * Create a Pulsar client object connecting to the specified cluster address and using the specified
@@ -73,7 +73,7 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
 
 void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
                                          const pulsar_producer_configuration_t *conf,
-                                         pulsar_create_producer_callback callback);
+                                         pulsar_create_producer_callback callback, void *ctx);
 
 pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
                                       const char *subscriptionName,
@@ -81,8 +81,8 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
                                       pulsar_consumer_t **consumer);
 
 void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
-                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
-                                   pulsar_subscribe_callback callback);
+                                   const pulsar_consumer_configuration_t *conf,
+                                   pulsar_subscribe_callback callback, void *ctx);
 
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
@@ -119,12 +119,12 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
 
 void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
                                        const pulsar_message_id_t *startMessageId,
-                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
-                                       pulsar_reader_callback callback);
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+                                       void *ctx);
 
 pulsar_result pulsar_client_close(pulsar_client_t *client);
 
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback);
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx);
 
 void pulsar_client_free(pulsar_client_t *client);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index 59c99e3..2917eea 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -29,7 +29,7 @@ extern "C" {
 
 typedef struct _pulsar_consumer pulsar_consumer_t;
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
 
 /**
  * @return the topic this consumer is subscribed to
@@ -67,7 +67,8 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
  *
  * @param callback the callback to get notified when the operation is complete
  */
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+                                       void *ctx);
 
 /**
  * Receive a single message.
@@ -117,10 +118,10 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
  * @param callback callback that will be triggered when the message has been acknowledged
  */
 void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                       pulsar_result_callback callback);
+                                       pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
-                                          pulsar_result_callback callback);
+                                          pulsar_result_callback callback, void *ctx);
 
 /**
  * Acknowledge the reception of all the messages in the stream up to (and including)
@@ -155,15 +156,15 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
  * @param callback callback that will be triggered when the message has been acknowledged
  */
 void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                                  pulsar_result_callback callback);
+                                                  pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
                                                      pulsar_message_id_t *messageId,
-                                                     pulsar_result_callback callback);
+                                                     pulsar_result_callback callback, void *ctx);
 
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
 
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_free(pulsar_consumer_t *consumer);
 
@@ -187,7 +188,7 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
  * connection
  * breaks, the messages are redelivered after reconnect.
  */
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
 
 #ifdef __cplusplus
 }
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 3bd9571..445e34e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -43,7 +43,7 @@ typedef enum {
 } pulsar_consumer_type;
 
 /// Callback definition for MessageListener
-typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg);
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);
 
 pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
 
@@ -73,10 +73,11 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
  * for every message received.
  */
 void pulsar_consumer_configuration_set_message_listener(
-    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener);
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+    void *ctx);
 
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration,
-                                         pulsar_consumer_t *consumer);
+int pulsar_consumer_configuration_has_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration);
 
 /**
  * Sets the size of the consumer receive queue.
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h b/pulsar-client-cpp/include/pulsar/c/message_id.h
index a0eb684..44d0c8f 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_id.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -41,13 +41,17 @@ const pulsar_message_id_t *pulsar_message_id_latest();
 /**
  * Serialize the message id into a binary string for storing
  */
-const void *pulsar_message_id_serialize(int *len);
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len);
 
 /**
  * Deserialize a message id from a binary string
  */
 pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len);
 
+char *pulsar_message_id_str(pulsar_message_id_t *messageId);
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId);
+
 #ifdef __cplusplus
 }
 #endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h b/pulsar-client-cpp/include/pulsar/c/message_router.h
index aea4188..07ff7a3 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_router.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -27,7 +27,8 @@ extern "C" {
 
 typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
 
-typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata);
+typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
+                                     void *ctx);
 
 int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h b/pulsar-client-cpp/include/pulsar/c/producer.h
index 6121e06..6b506b8 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -30,8 +30,8 @@ extern "C" {
 
 typedef struct _pulsar_producer pulsar_producer_t;
 
-typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
-typedef void (*pulsar_close_callback)(pulsar_result);
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx);
+typedef void (*pulsar_close_callback)(pulsar_result, void *ctx);
 
 /**
  * @return the topic to which producer is publishing to
@@ -76,7 +76,7 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
  * @param callback the callback to get notification of the completion
  */
 void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
-                                pulsar_send_callback callback);
+                                pulsar_send_callback callback, void *ctx);
 
 /**
  * Get the last sequence id that was published by this producer.
@@ -110,7 +110,7 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
  * triggered when all pending write requests are persisted. In case of errors,
  * pending writes will not be retried.
  */
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback);
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx);
 
 void pulsar_producer_free(pulsar_producer_t *producer);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 534c411..636fe68 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -97,7 +97,7 @@ pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_rout
     pulsar_producer_configuration_t *conf);
 
 void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
-                                                      pulsar_message_router router);
+                                                      pulsar_message_router router, void *ctx);
 
 void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf,
                                                       pulsar_hashing_scheme scheme);
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h b/pulsar-client-cpp/include/pulsar/c/reader.h
index 648c172..547bbf2 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -27,7 +27,7 @@ extern "C" {
 
 typedef struct _pulsar_reader pulsar_reader_t;
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
 
 /**
  * @return the topic this reader is reading from
@@ -60,7 +60,7 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
 
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
 
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback);
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx);
 
 void pulsar_reader_free(pulsar_reader_t *reader);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
index 914bbd9..c7aaf14 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -25,7 +25,7 @@ extern "C" {
 
 typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
 
-typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg);
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg, void *ctx);
 
 pulsar_reader_configuration_t *pulsar_reader_configuration_create();
 
@@ -36,7 +36,7 @@ void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configurati
  * messages. A listener will be called in order for every message received.
  */
 void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
-                                                     pulsar_reader_listener listener);
+                                                     pulsar_reader_listener listener, void *ctx);
 
 int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration);
 
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index cec7a13..1063bb8 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -47,21 +47,21 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
 }
 
 static void handle_create_producer_callback(pulsar::Result result, pulsar::Producer producer,
-                                            pulsar_create_producer_callback callback) {
+                                            pulsar_create_producer_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_producer_t *c_producer = new pulsar_producer_t;
         c_producer->producer = producer;
-        callback(pulsar_result_Ok, c_producer);
+        callback(pulsar_result_Ok, c_producer, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
                                          const pulsar_producer_configuration_t *conf,
-                                         pulsar_create_producer_callback callback) {
+                                         pulsar_create_producer_callback callback, void *ctx) {
     client->client->createProducerAsync(topic, conf->conf,
-                                        boost::bind(&handle_create_producer_callback, _1, _2, callback));
+                                        boost::bind(&handle_create_producer_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
@@ -81,21 +81,21 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
 }
 
 static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer consumer,
-                                      pulsar_subscribe_callback callback) {
+                                      pulsar_subscribe_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
         c_consumer->consumer = consumer;
-        callback(pulsar_result_Ok, c_consumer);
+        callback(pulsar_result_Ok, c_consumer, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
-                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
-                                   pulsar_subscribe_callback callback) {
+                                   const pulsar_consumer_configuration_t *conf,
+                                   pulsar_subscribe_callback callback, void *ctx) {
     client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration,
-                                   boost::bind(&handle_subscribe_callback, _1, _2, callback));
+                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
@@ -113,30 +113,30 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
 }
 
 static void handle_reader_callback(pulsar::Result result, pulsar::Reader reader,
-                                   pulsar_reader_callback callback) {
+                                   pulsar_reader_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_reader_t *c_reader = new pulsar_reader_t;
         c_reader->reader = reader;
-        callback(pulsar_result_Ok, c_reader);
+        callback(pulsar_result_Ok, c_reader, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
                                        const pulsar_message_id_t *startMessageId,
-                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
-                                       pulsar_reader_callback callback) {
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+                                       void *ctx) {
     client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf,
-                                      boost::bind(&handle_reader_callback, _1, _2, callback));
+                                      boost::bind(&handle_reader_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); }
 
-static void handle_client_close(pulsar::Result result, pulsar_close_callback callback) {
-    callback((pulsar_result)result);
+static void handle_client_close(pulsar::Result result, pulsar_close_callback callback, void *ctx) {
+    callback((pulsar_result)result, ctx);
 }
 
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback) {
-    client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx) {
+    client->client->closeAsync(boost::bind(handle_client_close, _1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index 22021a5..dae824a 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -33,8 +33,9 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.unsubscribe();
 }
 
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
-    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+                                       void *ctx) {
+    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) {
@@ -67,14 +68,15 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
 }
 
 void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                       pulsar_result_callback callback) {
-    consumer->consumer.acknowledgeAsync(message->message, boost::bind(handle_result_callback, _1, callback));
+                                       pulsar_result_callback callback, void *ctx) {
+    consumer->consumer.acknowledgeAsync(message->message,
+                                        boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
-                                          pulsar_result_callback callback) {
+                                          pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeAsync(messageId->messageId,
-                                        boost::bind(handle_result_callback, _1, callback));
+                                        boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) {
@@ -87,24 +89,24 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
 }
 
 void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                                  pulsar_result_callback callback) {
+                                                  pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeCumulativeAsync(message->message,
-                                                  boost::bind(handle_result_callback, _1, callback));
+                                                  boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
                                                      pulsar_message_id_t *messageId,
-                                                     pulsar_result_callback callback) {
+                                                     pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
-                                                  boost::bind(handle_result_callback, _1, callback));
+                                                  boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.close();
 }
 
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
-    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx) {
+    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
@@ -117,6 +119,6 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.resumeMessageListener();
 }
 
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer) {
     return consumer->consumer.redeliverUnacknowledgedMessages();
 }
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index dcd0aac..42c7c73 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -41,21 +41,23 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
 }
 
 static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
-                                      pulsar_message_listener listener) {
+                                      pulsar_message_listener listener, void *ctx) {
     pulsar_consumer_t c_consumer;
     c_consumer.consumer = consumer;
     pulsar_message_t *message = new pulsar_message_t;
     message->message = msg;
-    listener(&c_consumer, message);
+    listener(&c_consumer, message, ctx);
 }
 
 void pulsar_consumer_configuration_set_message_listener(
-    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener) {
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+    void *ctx) {
     consumer_configuration->consumerConfiguration.setMessageListener(
-        boost::bind(message_listener_callback, _1, _2, messageListener));
+        boost::bind(message_listener_callback, _1, _2, messageListener, ctx));
 }
 
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration) {
+int pulsar_consumer_configuration_has_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration) {
     return consumer_configuration->consumerConfiguration.hasMessageListener();
 }
 
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc b/pulsar-client-cpp/lib/c/c_MessageId.cc
index 5728359..8d1e96c 100644
--- a/pulsar-client-cpp/lib/c/c_MessageId.cc
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -21,6 +21,7 @@
 #include "c_structs.h"
 
 #include <boost/thread/once.hpp>
+#include <sstream>
 
 boost::once_flag initialized = BOOST_ONCE_INIT;
 
@@ -42,7 +43,7 @@ const pulsar_message_id_t *pulsar_message_id_latest() {
     return &latest;
 }
 
-const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
     std::string str;
     messageId->messageId.serialize(str);
     void *p = malloc(str.length());
@@ -56,3 +57,13 @@ pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t
     messageId->messageId = pulsar::MessageId::deserialize(strId);
     return messageId;
 }
+
+char *pulsar_message_id_str(pulsar_message_id_t *messageId) {
+    std::stringstream ss;
+    ss << messageId->messageId;
+    std::string s = ss.str();
+
+    return strndup(s.c_str(), s.length());
+}
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId) { delete messageId; }
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc
index 1de670c..784167e 100644
--- a/pulsar-client-cpp/lib/c/c_Producer.cc
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -38,15 +38,15 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
     return (pulsar_result)producer->producer.send(msg->message);
 }
 
-static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
-                                 pulsar_send_callback callback) {
-    callback((pulsar_result)result, msg);
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, pulsar_send_callback callback,
+                                 void *ctx) {
+    callback((pulsar_result)result, msg, ctx);
 }
 
 void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
-                                pulsar_send_callback callback) {
+                                pulsar_send_callback callback, void *ctx) {
     msg->message = msg->builder.build();
-    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback));
+    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback, ctx));
 }
 
 int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
@@ -57,6 +57,6 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
     return (pulsar_result)producer->producer.close();
 }
 
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback) {
-    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
+    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 8cc9345..914fc0a 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -107,9 +107,10 @@ pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
 
 class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
     pulsar_message_router _router;
+    void *_ctx;
 
    public:
-    MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+    MessageRoutingPolicy(pulsar_message_router router, void *ctx) : _router(router), _ctx(ctx) {}
 
     int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata &topicMetadata) {
         pulsar_message_t message;
@@ -118,13 +119,13 @@ class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
         pulsar_topic_metadata_t metadata;
         metadata.metadata = &topicMetadata;
 
-        return _router(&message, &metadata);
+        return _router(&message, &metadata, _ctx);
     }
 };
 
 void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
-                                                      pulsar_message_router router) {
-    conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+                                                      pulsar_message_router router, void *ctx) {
+    conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router, ctx));
 }
 
 void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc
index bb5d69b..3f7849d 100644
--- a/pulsar-client-cpp/lib/c/c_Reader.cc
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -47,8 +47,8 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
 
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
 
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback) {
-    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {
+    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
index b6419a6..55a6dc5 100644
--- a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -32,17 +32,17 @@ pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
 void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration) { delete configuration; }
 
 static void message_listener_callback(pulsar::Reader reader, const pulsar::Message &msg,
-                                      pulsar_reader_listener listener) {
+                                      pulsar_reader_listener listener, void *ctx) {
     pulsar_reader_t c_reader;
     c_reader.reader = reader;
     pulsar_message_t *message = new pulsar_message_t;
     message->message = msg;
-    listener(&c_reader, message);
+    listener(&c_reader, message, ctx);
 }
 
 void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
-                                                     pulsar_reader_listener listener) {
-    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener));
+                                                     pulsar_reader_listener listener, void *ctx) {
+    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener, ctx));
 }
 
 int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) {
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
index b207ab4..a4ff193 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -73,8 +73,10 @@ struct _pulsar_topic_metadata {
     const pulsar::TopicMetadata* metadata;
 };
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result res, void* ctx);
 
-static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback) {
-    callback((pulsar_result)result);
+static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback, void* ctx) {
+    if (callback) {
+        callback((pulsar_result)result, ctx);
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.