You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:21 UTC
[incubator-pulsar] 03/04: Added producer/consumer properties in Go
client (#2447)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit af1ff3a789bec223b102bc7e27e33cf7bdc17104
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 31 15:39:11 2018 -0700
Added producer/consumer properties in Go client (#2447)
---
pulsar-client-cpp/include/pulsar/ProducerConfiguration.h | 5 +++--
pulsar-client-cpp/include/pulsar/c/consumer_configuration.h | 3 +++
pulsar-client-cpp/include/pulsar/c/producer_configuration.h | 3 +++
pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 5 +++++
pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 5 +++++
pulsar-client-go/pulsar/c_consumer.go | 12 ++++++++++++
pulsar-client-go/pulsar/c_producer.go | 12 ++++++++++++
pulsar-client-go/pulsar/consumer.go | 4 ++++
pulsar-client-go/pulsar/producer.go | 4 ++++
pulsar-client-go/pulsar/producer_test.go | 8 ++++++--
10 files changed, 57 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 45154c5..565a6ab 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -128,7 +128,7 @@ class ProducerConfiguration {
ProducerConfiguration& addEncryptionKey(std::string key);
/**
- * Check whether the message has a specific property attached.
+ * Check whether the producer has a specific property attached.
*
* @param name the name of the property to check
* @return true if the message has the specified property
@@ -150,7 +150,8 @@ class ProducerConfiguration {
std::map<std::string, std::string>& getProperties() const;
/**
- * Sets a new property on a message.
+ * Sets a new property on the producer
+ * .
* @param name the name of the property
* @param value the associated value
*/
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 7299867..fca47eb 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -152,6 +152,9 @@ int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_
void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration,
int compacted);
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+ const char *value);
+
// const CryptoKeyReaderPtr getCryptoKeyReader()
//
// const;
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index ae88198..670bf50 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -134,6 +134,9 @@ void pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod
unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
pulsar_producer_configuration_t *conf);
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+ const char *value);
+
// const CryptoKeyReaderPtr getCryptoKeyReader() const;
// ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
//
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index c8d5453..75cdc47 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -113,3 +113,8 @@ void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume
int compacted) {
consumer_configuration->consumerConfiguration.setReadCompacted(compacted);
}
+
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+ const char *value) {
+ conf->consumerConfiguration.setProperty(name, value);
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 914fc0a..a8eb5be 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -174,3 +174,8 @@ unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
pulsar_producer_configuration_t *conf) {
return conf->conf.getBatchingMaxPublishDelayMs();
}
+
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+ const char *value) {
+ conf->conf.setProperty(name, value);
+}
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 7f613b2..1b41a71 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,18 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_consumer_name(conf, name)
}
+ if options.Properties != nil {
+ for key, value := range options.Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_consumer_configuration_set_property(conf, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ }
+
C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
subName := C.CString(options.SubscriptionName)
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index b4cd2c5..284315d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -124,6 +124,18 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
C.pulsar_producer_configuration_set_batching_max_messages(conf, C.uint(options.BatchingMaxMessages))
}
+ if options.Properties != nil {
+ for key, value := range options.Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_producer_configuration_set_property(conf, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ }
+
topicName := C.CString(options.Topic)
defer C.free(unsafe.Pointer(topicName))
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index b9f2616..030ba1b 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -64,6 +64,10 @@ type ConsumerOptions struct {
// This argument is required when subscribing
SubscriptionName string
+ // Attach a set of application defined properties to the consumer
+ // This properties will be visible in the topic stats
+ Properties map[string]string
+
// Set the timeout for unacked messages
// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
// Default is 0, which means message are not being replayed based on ack time
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 2cfd141..46d6dd6 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -71,6 +71,10 @@ type ProducerOptions struct {
// a topic.
Name string
+ // Attach a set of application defined properties to the producer
+ // This properties will be visible in the topic stats
+ Properties map[string]string
+
// Set the send timeout (default: 30 seconds)
// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 940be85..cfa0bcb 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -20,9 +20,9 @@
package pulsar
import (
- "testing"
- "fmt"
"context"
+ "fmt"
+ "testing"
"time"
)
@@ -77,6 +77,10 @@ func TestProducer(t *testing.T) {
MaxPendingMessages: 100,
BlockIfQueueFull: true,
CompressionType: LZ4,
+ Properties: map[string]string{
+ "my-name": "test",
+ "key": "value",
+ },
})
assertNil(t, err)