You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/10 23:01:15 UTC
[incubator-pulsar] branch master updated: In C API add context
pointer to callbacks (#1761)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 40a5aa5 In C API add context pointer to callbacks (#1761)
40a5aa5 is described below
commit 40a5aa5366c6e8c1ed172bfc5fcf7e1749717703
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu May 10 16:01:12 2018 -0700
In C API add context pointer to callbacks (#1761)
---
.../examples/SampleConsumerListenerCApi.c | 4 +--
pulsar-client-cpp/include/pulsar/c/client.h | 20 +++++------
pulsar-client-cpp/include/pulsar/c/consumer.h | 17 ++++-----
.../include/pulsar/c/consumer_configuration.h | 9 ++---
pulsar-client-cpp/include/pulsar/c/message_id.h | 6 +++-
.../include/pulsar/c/message_router.h | 3 +-
pulsar-client-cpp/include/pulsar/c/producer.h | 8 ++---
.../include/pulsar/c/producer_configuration.h | 2 +-
pulsar-client-cpp/include/pulsar/c/reader.h | 4 +--
.../include/pulsar/c/reader_configuration.h | 4 +--
pulsar-client-cpp/lib/c/c_Client.cc | 42 +++++++++++-----------
pulsar-client-cpp/lib/c/c_Consumer.cc | 28 ++++++++-------
pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 12 ++++---
pulsar-client-cpp/lib/c/c_MessageId.cc | 13 ++++++-
pulsar-client-cpp/lib/c/c_Producer.cc | 14 ++++----
pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 9 ++---
pulsar-client-cpp/lib/c/c_Reader.cc | 4 +--
pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc | 8 ++---
pulsar-client-cpp/lib/c/c_structs.h | 8 +++--
19 files changed, 120 insertions(+), 95 deletions(-)
diff --git a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
index 8f3ed0d..e75c5d5 100644
--- a/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
+++ b/pulsar-client-cpp/examples/SampleConsumerListenerCApi.c
@@ -20,7 +20,7 @@
#include <stdio.h>
#include <pulsar/c/client.h>
-static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message) {
+static void listener_callback(pulsar_consumer_t* consumer, pulsar_message_t* message, void* ctx) {
printf("Received message with payload: '%.*s'\n", pulsar_message_get_length(message),
pulsar_message_get_data(message));
@@ -34,7 +34,7 @@ int main() {
pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
pulsar_consumer_configuration_set_consumer_type(consumer_conf, pulsar_ConsumerShared);
- pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback);
+ pulsar_consumer_configuration_set_message_listener(consumer_conf, listener_callback, NULL);
pulsar_consumer_t *consumer;
pulsar_result res = pulsar_client_subscribe(client, "my-topic", "my-subscrition", consumer_conf, &consumer);
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 11320e7..b8ef1ed 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -40,12 +40,12 @@ typedef struct _pulsar_producer pulsar_producer_t;
typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
-typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer);
+typedef void (*pulsar_create_producer_callback)(pulsar_result result, pulsar_producer_t *producer, void *ctx);
-typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer);
-typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader);
+typedef void (*pulsar_subscribe_callback)(pulsar_result result, pulsar_consumer_t *consumer, void *ctx);
+typedef void (*pulsar_reader_callback)(pulsar_result result, pulsar_reader_t *reader, void *ctx);
-typedef void (*pulsar_close_callback)(pulsar_result result);
+typedef void (*pulsar_close_callback)(pulsar_result result, void *ctx);
/**
* Create a Pulsar client object connecting to the specified cluster address and using the specified
@@ -73,7 +73,7 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
const pulsar_producer_configuration_t *conf,
- pulsar_create_producer_callback callback);
+ pulsar_create_producer_callback callback, void *ctx);
pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
const char *subscriptionName,
@@ -81,8 +81,8 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
pulsar_consumer_t **consumer);
void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
- const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
- pulsar_subscribe_callback callback);
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx);
/**
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
@@ -119,12 +119,12 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
const pulsar_message_id_t *startMessageId,
- pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
- pulsar_reader_callback callback);
+ pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+ void *ctx);
pulsar_result pulsar_client_close(pulsar_client_t *client);
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback);
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx);
void pulsar_client_free(pulsar_client_t *client);
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index 59c99e3..2917eea 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -29,7 +29,7 @@ extern "C" {
typedef struct _pulsar_consumer pulsar_consumer_t;
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
/**
* @return the topic this consumer is subscribed to
@@ -67,7 +67,8 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer);
*
* @param callback the callback to get notified when the operation is complete
*/
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+ void *ctx);
/**
* Receive a single message.
@@ -117,10 +118,10 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
* @param callback callback that will be triggered when the message has been acknowledged
*/
void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
- pulsar_result_callback callback);
+ pulsar_result_callback callback, void *ctx);
void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
- pulsar_result_callback callback);
+ pulsar_result_callback callback, void *ctx);
/**
* Acknowledge the reception of all the messages in the stream up to (and including)
@@ -155,15 +156,15 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
* @param callback callback that will be triggered when the message has been acknowledged
*/
void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
- pulsar_result_callback callback);
+ pulsar_result_callback callback, void *ctx);
void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,
- pulsar_result_callback callback);
+ pulsar_result_callback callback, void *ctx);
pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback);
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx);
void pulsar_consumer_free(pulsar_consumer_t *consumer);
@@ -187,7 +188,7 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer);
* connection
* breaks, the messages are redelivered after reconnect.
*/
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer);
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer);
#ifdef __cplusplus
}
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 3bd9571..445e34e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -43,7 +43,7 @@ typedef enum {
} pulsar_consumer_type;
/// Callback definition for MessageListener
-typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg);
+typedef void (*pulsar_message_listener)(pulsar_consumer_t *consumer, pulsar_message_t *msg, void *ctx);
pulsar_consumer_configuration_t *pulsar_consumer_configuration_create();
@@ -73,10 +73,11 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
* for every message received.
*/
void pulsar_consumer_configuration_set_message_listener(
- pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener);
+ pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+ void *ctx);
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration,
- pulsar_consumer_t *consumer);
+int pulsar_consumer_configuration_has_message_listener(
+ pulsar_consumer_configuration_t *consumer_configuration);
/**
* Sets the size of the consumer receive queue.
diff --git a/pulsar-client-cpp/include/pulsar/c/message_id.h b/pulsar-client-cpp/include/pulsar/c/message_id.h
index a0eb684..44d0c8f 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_id.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_id.h
@@ -41,13 +41,17 @@ const pulsar_message_id_t *pulsar_message_id_latest();
/**
* Serialize the message id into a binary string for storing
*/
-const void *pulsar_message_id_serialize(int *len);
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len);
/**
* Deserialize a message id from a binary string
*/
pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t len);
+char *pulsar_message_id_str(pulsar_message_id_t *messageId);
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId);
+
#ifdef __cplusplus
}
#endif
\ No newline at end of file
diff --git a/pulsar-client-cpp/include/pulsar/c/message_router.h b/pulsar-client-cpp/include/pulsar/c/message_router.h
index aea4188..07ff7a3 100644
--- a/pulsar-client-cpp/include/pulsar/c/message_router.h
+++ b/pulsar-client-cpp/include/pulsar/c/message_router.h
@@ -27,7 +27,8 @@ extern "C" {
typedef struct _pulsar_topic_metadata pulsar_topic_metadata_t;
-typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata);
+typedef int (*pulsar_message_router)(pulsar_message_t *msg, pulsar_topic_metadata_t *topicMetadata,
+ void *ctx);
int pulsar_topic_metadata_get_num_partitions(pulsar_topic_metadata_t *topicMetadata);
diff --git a/pulsar-client-cpp/include/pulsar/c/producer.h b/pulsar-client-cpp/include/pulsar/c/producer.h
index 6121e06..6b506b8 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer.h
@@ -30,8 +30,8 @@ extern "C" {
typedef struct _pulsar_producer pulsar_producer_t;
-typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg);
-typedef void (*pulsar_close_callback)(pulsar_result);
+typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx);
+typedef void (*pulsar_close_callback)(pulsar_result, void *ctx);
/**
* @return the topic to which producer is publishing to
@@ -76,7 +76,7 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
* @param callback the callback to get notification of the completion
*/
void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
- pulsar_send_callback callback);
+ pulsar_send_callback callback, void *ctx);
/**
* Get the last sequence id that was published by this producer.
@@ -110,7 +110,7 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer);
* triggered when all pending write requests are persisted. In case of errors,
* pending writes will not be retried.
*/
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback);
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx);
void pulsar_producer_free(pulsar_producer_t *producer);
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 534c411..636fe68 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -97,7 +97,7 @@ pulsar_partitions_routing_mode pulsar_producer_configuration_get_partitions_rout
pulsar_producer_configuration_t *conf);
void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
- pulsar_message_router router);
+ pulsar_message_router router, void *ctx);
void pulsar_producer_configuration_set_hashing_scheme(pulsar_producer_configuration_t *conf,
pulsar_hashing_scheme scheme);
diff --git a/pulsar-client-cpp/include/pulsar/c/reader.h b/pulsar-client-cpp/include/pulsar/c/reader.h
index 648c172..547bbf2 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader.h
@@ -27,7 +27,7 @@ extern "C" {
typedef struct _pulsar_reader pulsar_reader_t;
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result, void *);
/**
* @return the topic this reader is reading from
@@ -60,7 +60,7 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
pulsar_result pulsar_reader_close(pulsar_reader_t *reader);
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback);
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx);
void pulsar_reader_free(pulsar_reader_t *reader);
diff --git a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
index 914bbd9..c7aaf14 100644
--- a/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/reader_configuration.h
@@ -25,7 +25,7 @@ extern "C" {
typedef struct _pulsar_reader_configuration pulsar_reader_configuration_t;
-typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg);
+typedef void (*pulsar_reader_listener)(pulsar_reader_t *reader, pulsar_message_t *msg, void *ctx);
pulsar_reader_configuration_t *pulsar_reader_configuration_create();
@@ -36,7 +36,7 @@ void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configurati
* messages. A listener will be called in order for every message received.
*/
void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
- pulsar_reader_listener listener);
+ pulsar_reader_listener listener, void *ctx);
int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration);
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index cec7a13..1063bb8 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -47,21 +47,21 @@ pulsar_result pulsar_client_create_producer(pulsar_client_t *client, const char
}
static void handle_create_producer_callback(pulsar::Result result, pulsar::Producer producer,
- pulsar_create_producer_callback callback) {
+ pulsar_create_producer_callback callback, void *ctx) {
if (result == pulsar::ResultOk) {
pulsar_producer_t *c_producer = new pulsar_producer_t;
c_producer->producer = producer;
- callback(pulsar_result_Ok, c_producer);
+ callback(pulsar_result_Ok, c_producer, ctx);
} else {
- callback((pulsar_result)result, NULL);
+ callback((pulsar_result)result, NULL, ctx);
}
}
void pulsar_client_create_producer_async(pulsar_client_t *client, const char *topic,
const pulsar_producer_configuration_t *conf,
- pulsar_create_producer_callback callback) {
+ pulsar_create_producer_callback callback, void *ctx) {
client->client->createProducerAsync(topic, conf->conf,
- boost::bind(&handle_create_producer_callback, _1, _2, callback));
+ boost::bind(&handle_create_producer_callback, _1, _2, callback, ctx));
}
pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
@@ -81,21 +81,21 @@ pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic
}
static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer consumer,
- pulsar_subscribe_callback callback) {
+ pulsar_subscribe_callback callback, void *ctx) {
if (result == pulsar::ResultOk) {
pulsar_consumer_t *c_consumer = new pulsar_consumer_t;
c_consumer->consumer = consumer;
- callback(pulsar_result_Ok, c_consumer);
+ callback(pulsar_result_Ok, c_consumer, ctx);
} else {
- callback((pulsar_result)result, NULL);
+ callback((pulsar_result)result, NULL, ctx);
}
}
void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
- const pulsar_consumer_configuration_t *conf, pulsar_consumer_t **consumer,
- pulsar_subscribe_callback callback) {
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx) {
client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration,
- boost::bind(&handle_subscribe_callback, _1, _2, callback));
+ boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
}
pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
@@ -113,30 +113,30 @@ pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *t
}
static void handle_reader_callback(pulsar::Result result, pulsar::Reader reader,
- pulsar_reader_callback callback) {
+ pulsar_reader_callback callback, void *ctx) {
if (result == pulsar::ResultOk) {
pulsar_reader_t *c_reader = new pulsar_reader_t;
c_reader->reader = reader;
- callback(pulsar_result_Ok, c_reader);
+ callback(pulsar_result_Ok, c_reader, ctx);
} else {
- callback((pulsar_result)result, NULL);
+ callback((pulsar_result)result, NULL, ctx);
}
}
void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topic,
const pulsar_message_id_t *startMessageId,
- pulsar_reader_configuration_t *conf, pulsar_reader_t **reader,
- pulsar_reader_callback callback) {
+ pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
+ void *ctx) {
client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf,
- boost::bind(&handle_reader_callback, _1, _2, callback));
+ boost::bind(&handle_reader_callback, _1, _2, callback, ctx));
}
pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); }
-static void handle_client_close(pulsar::Result result, pulsar_close_callback callback) {
- callback((pulsar_result)result);
+static void handle_client_close(pulsar::Result result, pulsar_close_callback callback, void *ctx) {
+ callback((pulsar_result)result, ctx);
}
-void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback) {
- client->client->closeAsync(boost::bind(handle_client_close, _1, callback));
+void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx) {
+ client->client->closeAsync(boost::bind(handle_client_close, _1, callback, ctx));
}
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index 22021a5..dae824a 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -33,8 +33,9 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
return (pulsar_result)consumer->consumer.unsubscribe();
}
-void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
- consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
+ void *ctx) {
+ consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}
pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) {
@@ -67,14 +68,15 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
}
void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
- pulsar_result_callback callback) {
- consumer->consumer.acknowledgeAsync(message->message, boost::bind(handle_result_callback, _1, callback));
+ pulsar_result_callback callback, void *ctx) {
+ consumer->consumer.acknowledgeAsync(message->message,
+ boost::bind(handle_result_callback, _1, callback, ctx));
}
void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
- pulsar_result_callback callback) {
+ pulsar_result_callback callback, void *ctx) {
consumer->consumer.acknowledgeAsync(messageId->messageId,
- boost::bind(handle_result_callback, _1, callback));
+ boost::bind(handle_result_callback, _1, callback, ctx));
}
pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) {
@@ -87,24 +89,24 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
}
void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
- pulsar_result_callback callback) {
+ pulsar_result_callback callback, void *ctx) {
consumer->consumer.acknowledgeCumulativeAsync(message->message,
- boost::bind(handle_result_callback, _1, callback));
+ boost::bind(handle_result_callback, _1, callback, ctx));
}
void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
pulsar_message_id_t *messageId,
- pulsar_result_callback callback) {
+ pulsar_result_callback callback, void *ctx) {
consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
- boost::bind(handle_result_callback, _1, callback));
+ boost::bind(handle_result_callback, _1, callback, ctx));
}
pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
return (pulsar_result)consumer->consumer.close();
}
-void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback) {
- consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx) {
+ consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}
void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
@@ -117,6 +119,6 @@ pulsar_result resume_message_listener(pulsar_consumer_t *consumer) {
return (pulsar_result)consumer->consumer.resumeMessageListener();
}
-void redeliverUnacknowledgedMessages(pulsar_consumer_t *consumer) {
+void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consumer) {
return consumer->consumer.redeliverUnacknowledgedMessages();
}
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index dcd0aac..42c7c73 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -41,21 +41,23 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
}
static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
- pulsar_message_listener listener) {
+ pulsar_message_listener listener, void *ctx) {
pulsar_consumer_t c_consumer;
c_consumer.consumer = consumer;
pulsar_message_t *message = new pulsar_message_t;
message->message = msg;
- listener(&c_consumer, message);
+ listener(&c_consumer, message, ctx);
}
void pulsar_consumer_configuration_set_message_listener(
- pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener) {
+ pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
+ void *ctx) {
consumer_configuration->consumerConfiguration.setMessageListener(
- boost::bind(message_listener_callback, _1, _2, messageListener));
+ boost::bind(message_listener_callback, _1, _2, messageListener, ctx));
}
-int pulsar_consumer_has_message_listener(pulsar_consumer_configuration_t *consumer_configuration) {
+int pulsar_consumer_configuration_has_message_listener(
+ pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.hasMessageListener();
}
diff --git a/pulsar-client-cpp/lib/c/c_MessageId.cc b/pulsar-client-cpp/lib/c/c_MessageId.cc
index 5728359..8d1e96c 100644
--- a/pulsar-client-cpp/lib/c/c_MessageId.cc
+++ b/pulsar-client-cpp/lib/c/c_MessageId.cc
@@ -21,6 +21,7 @@
#include "c_structs.h"
#include <boost/thread/once.hpp>
+#include <sstream>
boost::once_flag initialized = BOOST_ONCE_INIT;
@@ -42,7 +43,7 @@ const pulsar_message_id_t *pulsar_message_id_latest() {
return &latest;
}
-const void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
+void *pulsar_message_id_serialize(pulsar_message_id_t *messageId, int *len) {
std::string str;
messageId->messageId.serialize(str);
void *p = malloc(str.length());
@@ -56,3 +57,13 @@ pulsar_message_id_t *pulsar_message_id_deserialize(const void *buffer, uint32_t
messageId->messageId = pulsar::MessageId::deserialize(strId);
return messageId;
}
+
+char *pulsar_message_id_str(pulsar_message_id_t *messageId) {
+ std::stringstream ss;
+ ss << messageId->messageId;
+ std::string s = ss.str();
+
+ return strndup(s.c_str(), s.length());
+}
+
+void pulsar_message_id_free(pulsar_message_id_t *messageId) { delete messageId; }
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc
index 1de670c..784167e 100644
--- a/pulsar-client-cpp/lib/c/c_Producer.cc
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -38,15 +38,15 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
return (pulsar_result)producer->producer.send(msg->message);
}
-static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg,
- pulsar_send_callback callback) {
- callback((pulsar_result)result, msg);
+static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, pulsar_send_callback callback,
+ void *ctx) {
+ callback((pulsar_result)result, msg, ctx);
}
void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
- pulsar_send_callback callback) {
+ pulsar_send_callback callback, void *ctx) {
msg->message = msg->builder.build();
- producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback));
+ producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback, ctx));
}
int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
@@ -57,6 +57,6 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
return (pulsar_result)producer->producer.close();
}
-void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback) {
- producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
+ producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 8cc9345..914fc0a 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -107,9 +107,10 @@ pulsar_hashing_scheme pulsar_producer_configuration_get_hashing_scheme(
class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
pulsar_message_router _router;
+ void *_ctx;
public:
- MessageRoutingPolicy(pulsar_message_router router) : _router(router) {}
+ MessageRoutingPolicy(pulsar_message_router router, void *ctx) : _router(router), _ctx(ctx) {}
int getPartition(const pulsar::Message &msg, const pulsar::TopicMetadata &topicMetadata) {
pulsar_message_t message;
@@ -118,13 +119,13 @@ class MessageRoutingPolicy : public pulsar::MessageRoutingPolicy {
pulsar_topic_metadata_t metadata;
metadata.metadata = &topicMetadata;
- return _router(&message, &metadata);
+ return _router(&message, &metadata, _ctx);
}
};
void pulsar_producer_configuration_set_message_router(pulsar_producer_configuration_t *conf,
- pulsar_message_router router) {
- conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router));
+ pulsar_message_router router, void *ctx) {
+ conf->conf.setMessageRouter(boost::make_shared<MessageRoutingPolicy>(router, ctx));
}
void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc
index bb5d69b..3f7849d 100644
--- a/pulsar-client-cpp/lib/c/c_Reader.cc
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -47,8 +47,8 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
-void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback) {
- reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback));
+void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {
+ reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
}
void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
index b6419a6..55a6dc5 100644
--- a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -32,17 +32,17 @@ pulsar_reader_configuration_t *pulsar_reader_configuration_create() {
void pulsar_reader_configuration_free(pulsar_reader_configuration_t *configuration) { delete configuration; }
static void message_listener_callback(pulsar::Reader reader, const pulsar::Message &msg,
- pulsar_reader_listener listener) {
+ pulsar_reader_listener listener, void *ctx) {
pulsar_reader_t c_reader;
c_reader.reader = reader;
pulsar_message_t *message = new pulsar_message_t;
message->message = msg;
- listener(&c_reader, message);
+ listener(&c_reader, message, ctx);
}
void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
- pulsar_reader_listener listener) {
- configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener));
+ pulsar_reader_listener listener, void *ctx) {
+ configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener, ctx));
}
int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) {
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
index b207ab4..a4ff193 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -73,8 +73,10 @@ struct _pulsar_topic_metadata {
const pulsar::TopicMetadata* metadata;
};
-typedef void (*pulsar_result_callback)(pulsar_result);
+typedef void (*pulsar_result_callback)(pulsar_result res, void* ctx);
-static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback) {
- callback((pulsar_result)result);
+static void handle_result_callback(pulsar::Result result, pulsar_result_callback callback, void* ctx) {
+ if (callback) {
+ callback((pulsar_result)result, ctx);
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.