You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/10 23:01:14 UTC

[GitHub] merlimat closed pull request #1761: In C API add context pointer to callbacks

merlimat closed pull request #1761: In C API add context pointer to callbacks
URL: https://github.com/apache/incubator-pulsar/pull/1761
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
index 8f3ed0df63..e75c5d5b2e 100644
--- a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -20,7 +20,7 @@
 #include <stdio.h>
 #include <pulsar/c/client.h>
 
-static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message) {
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message, void* ctx) {
     printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
            pulsar_message_get_data(message));
 
@@ -34,7 +34,7 @@ int main() {
 
     pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
     pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared);
-    pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback);
+    pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback, NULL);
 
     pulsar_consumer_t *consumer;
     pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer);
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 11320e7882..b8ef1ed1d8 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -40,12 +40,12 @@ typedef struct _pulsar_producer pulsar_producer_t;
 typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
 typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
 
-typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer);
+typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer, void *ctx);
 
-typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer);
-typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader);
+typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx);
 
-typedef void (*pulsar_close_callback)(pulsar_result result);
+typedef void (*pulsar_close_callback)(pulsar_result result, void *ctx);
 
 /**
  * Create a Pulsar client object connecting to the specified cluster address and using the specified
@@ -73,7 +73,7 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
 
 void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
                                          const pulsar_producer_configuration_t *conf,
-                                         pulsar_create_producer_callback callback);
+                                         pulsar_create_producer_callback callback, void *ctx);
 
 pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
                                       const char *subscriptionName,
@@ -81,8 +81,8 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
                                       pulsar_consumer_t **consumer);
 
 void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
-                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
-                                   pulsar_subscribe_callback callback);
+                                   const pulsar_consumer_configuration_t *conf,
+                                   pulsar_subscribe_callback callback, void *ctx);
 
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
@@ -119,12 +119,12 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
 
 void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
                                        const pulsar_message_id_t *startMessageId,
-                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
-                                       pulsar_reader_callback callback);
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+                                       void *ctx);
 
 pulsar_result pulsar_client_close(pulsar_client_t *client);
 
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback);
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx);
 
 void pulsar_client_free(pulsar_client_t *client);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index 59c99e3080..2917eea396 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -29,7 +29,7 @@ extern "C" {
 
 typedef struct _pulsar_consumer pulsar_consumer_t;
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
 
 /**
  * @return the topic this consumer is subscribed to
@@ -67,7 +67,8 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
  *
  * @param callback the callback to get notified when the operation is complete
  */
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+                                       void *ctx);
 
 /**
  * Receive a single message.
@@ -117,10 +118,10 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
  * @param callback callback that will be triggered when the message has been acknowledged
  */
 void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                       pulsar_result_callback callback);
+                                       pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
-                                          pulsar_result_callback callback);
+                                          pulsar_result_callback callback, void *ctx);
 
 /**
  * Acknowledge the reception of all the messages in the stream up to (and including)
@@ -155,15 +156,15 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
  * @param callback callback that will be triggered when the message has been acknowledged
  */
 void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                                  pulsar_result_callback callback);
+                                                  pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
                                                      pulsar_message_id_t *messageId,
-                                                     pulsar_result_callback callback);
+                                                     pulsar_result_callback callback, void *ctx);
 
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
 
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx);
 
 void pulsar_consumer_free(pulsar_consumer_t *consumer);
 
@@ -187,7 +188,7 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
  * connection
  * breaks, the messages are redelivered after reconnect.
  */
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
 
 #ifdef __cplusplus
 }
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 3bd95715b5..445e34e57b 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -43,7 +43,7 @@ typedef enum {
 } pulsar_consumer_type;
 
 /// Callback definition for MessageListener
-typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg);
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);
 
 pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
 
@@ -73,10 +73,11 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
  * for every message received.
  */
 void pulsar_consumer_configuration_set_message_listener(
-    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener);
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+    void *ctx);
 
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration,
-                                         pulsar_consumer_t *consumer);
+int pulsar_consumer_configuration_has_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration);
 
 /**
  * Sets the size of the consumer receive queue.
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h b/pulsar-client-cpp/include/pulsar/c/message_id.h
index a0eb684625..44d0c8fb0a 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_id.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -41,13 +41,17 @@ const pulsar_message_id_t *pulsar_message_id_latest();
 /**
  * Serialize the message id into a binary string for storing
  */
-const void *pulsar_message_id_serialize(int *len);
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len);
 
 /**
  * Deserialize a message id from a binary string
  */
 pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len);
 
+char *pulsar_message_id_str(pulsar_message_id_t *messageId);
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId);
+
 #ifdef __cplusplus
 }
 #endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h b/pulsar-client-cpp/include/pulsar/c/message_router.h
index aea4188e57..07ff7a3dc0 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_router.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -27,7 +27,8 @@ extern "C" {
 
 typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
 
-typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata);
+typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
+                                     void *ctx);
 
 int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h b/pulsar-client-cpp/include/pulsar/c/producer.h
index 6121e06653..6b506b859e 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -30,8 +30,8 @@ extern "C" {
 
 typedef struct _pulsar_producer pulsar_producer_t;
 
-typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
-typedef void (*pulsar_close_callback)(pulsar_result);
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx);
+typedef void (*pulsar_close_callback)(pulsar_result, void *ctx);
 
 /**
  * @return the topic to which producer is publishing to
@@ -76,7 +76,7 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
  * @param callback the callback to get notification of the completion
  */
 void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
-                                pulsar_send_callback callback);
+                                pulsar_send_callback callback, void *ctx);
 
 /**
  * Get the last sequence id that was published by this producer.
@@ -110,7 +110,7 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
  * triggered when all pending write requests are persisted. In case of errors,
  * pending writes will not be retried.
  */
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback);
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx);
 
 void pulsar_producer_free(pulsar_producer_t *producer);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 534c411e05..636fe68c6d 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -97,7 +97,7 @@ pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_rout
     pulsar_producer_configuration_t *conf);
 
 void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
-                                                      pulsar_message_router router);
+                                                      pulsar_message_router router, void *ctx);
 
 void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf,
                                                       pulsar_hashing_scheme scheme);
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h b/pulsar-client-cpp/include/pulsar/c/reader.h
index 648c172f94..547bbf2b8b 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -27,7 +27,7 @@ extern "C" {
 
 typedef struct _pulsar_reader pulsar_reader_t;
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
 
 /**
  * @return the topic this reader is reading from
@@ -60,7 +60,7 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
 
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
 
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback);
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx);
 
 void pulsar_reader_free(pulsar_reader_t *reader);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
index 914bbd9ede..c7aaf1482c 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -25,7 +25,7 @@ extern "C" {
 
 typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
 
-typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg);
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg, void *ctx);
 
 pulsar_reader_configuration_t *pulsar_reader_configuration_create();
 
@@ -36,7 +36,7 @@ void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configurati
  * messages. A listener will be called in order for every message received.
  */
 void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
-                                                     pulsar_reader_listener listener);
+                                                     pulsar_reader_listener listener, void *ctx);
 
 int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration);
 
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index cec7a13e46..1063bb877d 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -47,21 +47,21 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
 }
 
 static void handle_create_producer_callback(pulsar::Result result, pulsar::Producer producer,
-                                            pulsar_create_producer_callback callback) {
+                                            pulsar_create_producer_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_producer_t *c_producer = new pulsar_producer_t;
         c_producer->producer = producer;
-        callback(pulsar_result_Ok, c_producer);
+        callback(pulsar_result_Ok, c_producer, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
                                          const pulsar_producer_configuration_t *conf,
-                                         pulsar_create_producer_callback callback) {
+                                         pulsar_create_producer_callback callback, void *ctx) {
     client->client->createProducerAsync(topic, conf->conf,
-                                        boost::bind(&handle_create_producer_callback, _1, _2, callback));
+                                        boost::bind(&handle_create_producer_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
@@ -81,21 +81,21 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
 }
 
 static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer consumer,
-                                      pulsar_subscribe_callback callback) {
+                                      pulsar_subscribe_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
         c_consumer->consumer = consumer;
-        callback(pulsar_result_Ok, c_consumer);
+        callback(pulsar_result_Ok, c_consumer, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
-                                   const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
-                                   pulsar_subscribe_callback callback) {
+                                   const pulsar_consumer_configuration_t *conf,
+                                   pulsar_subscribe_callback callback, void *ctx) {
     client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration,
-                                   boost::bind(&handle_subscribe_callback, _1, _2, callback));
+                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
@@ -113,30 +113,30 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
 }
 
 static void handle_reader_callback(pulsar::Result result, pulsar::Reader reader,
-                                   pulsar_reader_callback callback) {
+                                   pulsar_reader_callback callback, void *ctx) {
     if (result == pulsar::ResultOk) {
         pulsar_reader_t *c_reader = new pulsar_reader_t;
         c_reader->reader = reader;
-        callback(pulsar_result_Ok, c_reader);
+        callback(pulsar_result_Ok, c_reader, ctx);
     } else {
-        callback((pulsar_result)result, NULL);
+        callback((pulsar_result)result, NULL, ctx);
     }
 }
 
 void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
                                        const pulsar_message_id_t *startMessageId,
-                                       pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
-                                       pulsar_reader_callback callback) {
+                                       pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+                                       void *ctx) {
     client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf,
-                                      boost::bind(&handle_reader_callback, _1, _2, callback));
+                                      boost::bind(&handle_reader_callback, _1, _2, callback, ctx));
 }
 
 pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); }
 
-static void handle_client_close(pulsar::Result result, pulsar_close_callback callback) {
-    callback((pulsar_result)result);
+static void handle_client_close(pulsar::Result result, pulsar_close_callback callback, void *ctx) {
+    callback((pulsar_result)result, ctx);
 }
 
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback) {
-    client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx) {
+    client->client->closeAsync(boost::bind(handle_client_close, _1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index 22021a5146..dae824a447 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -33,8 +33,9 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.unsubscribe();
 }
 
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
-    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+                                       void *ctx) {
+    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) {
@@ -67,14 +68,15 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
 }
 
 void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                       pulsar_result_callback callback) {
-    consumer->consumer.acknowledgeAsync(message->message, boost::bind(handle_result_callback, _1, callback));
+                                       pulsar_result_callback callback, void *ctx) {
+    consumer->consumer.acknowledgeAsync(message->message,
+                                        boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
-                                          pulsar_result_callback callback) {
+                                          pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeAsync(messageId->messageId,
-                                        boost::bind(handle_result_callback, _1, callback));
+                                        boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) {
@@ -87,24 +89,24 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
 }
 
 void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
-                                                  pulsar_result_callback callback) {
+                                                  pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeCumulativeAsync(message->message,
-                                                  boost::bind(handle_result_callback, _1, callback));
+                                                  boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
                                                      pulsar_message_id_t *messageId,
-                                                     pulsar_result_callback callback) {
+                                                     pulsar_result_callback callback, void *ctx) {
     consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
-                                                  boost::bind(handle_result_callback, _1, callback));
+                                                  boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.close();
 }
 
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
-    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx) {
+    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
@@ -117,6 +119,6 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.resumeMessageListener();
 }
 
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer) {
     return consumer->consumer.redeliverUnacknowledgedMessages();
 }
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index dcd0aaccb8..42c7c73096 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -41,21 +41,23 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
 }
 
 static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
-                                      pulsar_message_listener listener) {
+                                      pulsar_message_listener listener, void *ctx) {
     pulsar_consumer_t c_consumer;
     c_consumer.consumer = consumer;
     pulsar_message_t *message = new pulsar_message_t;
     message->message = msg;
-    listener(&c_consumer, message);
+    listener(&c_consumer, message, ctx);
 }
 
 void pulsar_consumer_configuration_set_message_listener(
-    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener) {
+    pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+    void *ctx) {
     consumer_configuration->consumerConfiguration.setMessageListener(
-        boost::bind(message_listener_callback, _1, _2, messageListener));
+        boost::bind(message_listener_callback, _1, _2, messageListener, ctx));
 }
 
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration) {
+int pulsar_consumer_configuration_has_message_listener(
+    pulsar_consumer_configuration_t *consumer_configuration) {
     return consumer_configuration->consumerConfiguration.hasMessageListener();
 }
 
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc b/pulsar-client-cpp/lib/c/c_MessageId.cc
index 572835987b..8d1e96cf47 100644
--- a/pulsar-client-cpp/lib/c/c_MessageId.cc
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -21,6 +21,7 @@
 #include "c_structs.h"
 
 #include <boost/thread/once.hpp>
+#include <sstream>
 
 boost::once_flag initialized = BOOST_ONCE_INIT;
 
@@ -42,7 +43,7 @@ const pulsar_message_id_t *pulsar_message_id_latest() {
     return &latest;
 }
 
-const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
     std::string str;
     messageId->messageId.serialize(str);
     void *p = malloc(str.length());
@@ -56,3 +57,13 @@ pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t
     messageId->messageId = pulsar::MessageId::deserialize(strId);
     return messageId;
 }
+
+char *pulsar_message_id_str(pulsar_message_id_t *messageId) {
+    std::stringstream ss;
+    ss << messageId->messageId;
+    std::string s = ss.str();
+
+    return strndup(s.c_str(), s.length());
+}
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId) { delete messageId; }
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc
index 1de670c7fc..784167e626 100644
--- a/pulsar-client-cpp/lib/c/c_Producer.cc
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -38,15 +38,15 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
     return (pulsar_result)producer->producer.send(msg->message);
 }
 
-static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
-                                 pulsar_send_callback callback) {
-    callback((pulsar_result)result, msg);
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, pulsar_send_callback callback,
+                                 void *ctx) {
+    callback((pulsar_result)result, msg, ctx);
 }
 
 void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
-                                pulsar_send_callback callback) {
+                                pulsar_send_callback callback, void *ctx) {
     msg->message = msg->builder.build();
-    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback));
+    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback, ctx));
 }
 
 int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
@@ -57,6 +57,6 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
     return (pulsar_result)producer->producer.close();
 }
 
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback) {
-    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
+    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 8cc9345b6b..914fc0ac13 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -107,9 +107,10 @@ pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
 
 class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
     pulsar_message_router _router;
+    void *_ctx;
 
    public:
-    MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+    MessageRoutingPolicy(pulsar_message_router router, void *ctx) : _router(router), _ctx(ctx) {}
 
     int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata &topicMetadata) {
         pulsar_message_t message;
@@ -118,13 +119,13 @@ class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
         pulsar_topic_metadata_t metadata;
         metadata.metadata = &topicMetadata;
 
-        return _router(&message, &metadata);
+        return _router(&message, &metadata, _ctx);
     }
 };
 
 void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
-                                                      pulsar_message_router router) {
-    conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+                                                      pulsar_message_router router, void *ctx) {
+    conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router, ctx));
 }
 
 void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc
index bb5d69bbdc..3f7849def2 100644
--- a/pulsar-client-cpp/lib/c/c_Reader.cc
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -47,8 +47,8 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
 
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
 
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback) {
-    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {
+    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
 }
 
 void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
index b6419a6002..55a6dc5db3 100644
--- a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -32,17 +32,17 @@ pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
 void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration) { delete configuration; }
 
 static void message_listener_callback(pulsar::Reader reader, const pulsar::Message &msg,
-                                      pulsar_reader_listener listener) {
+                                      pulsar_reader_listener listener, void *ctx) {
     pulsar_reader_t c_reader;
     c_reader.reader = reader;
     pulsar_message_t *message = new pulsar_message_t;
     message->message = msg;
-    listener(&c_reader, message);
+    listener(&c_reader, message, ctx);
 }
 
 void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
-                                                     pulsar_reader_listener listener) {
-    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener));
+                                                     pulsar_reader_listener listener, void *ctx) {
+    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener, ctx));
 }
 
 int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) {
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
index b207ab4fbf..a4ff193ca2 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -73,8 +73,10 @@ struct _pulsar_topic_metadata {
     const pulsar::TopicMetadata* metadata;
 };
 
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result res, void* ctx);
 
-static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback) {
-    callback((pulsar_result)result);
+static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback, void* ctx) {
+    if (callback) {
+        callback((pulsar_result)result, ctx);
+    }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services