You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:21 UTC

[incubator-pulsar] 03/04: Added producer/consumer properties in Go client (#2447)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit af1ff3a789bec223b102bc7e27e33cf7bdc17104
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 31 15:39:11 2018 -0700

    Added producer/consumer properties in Go client (#2447)
---
 pulsar-client-cpp/include/pulsar/ProducerConfiguration.h    |  5 +++--
 pulsar-client-cpp/include/pulsar/c/consumer_configuration.h |  3 +++
 pulsar-client-cpp/include/pulsar/c/producer_configuration.h |  3 +++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc          |  5 +++++
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc          |  5 +++++
 pulsar-client-go/pulsar/c_consumer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/c_producer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/consumer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer_test.go                    |  8 ++++++--
 10 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 45154c5..565a6ab 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -128,7 +128,7 @@ class ProducerConfiguration {
     ProducerConfiguration& addEncryptionKey(std::string key);
 
     /**
-     * Check whether the message has a specific property attached.
+     * Check whether the producer has a specific property attached.
      *
      * @param name the name of the property to check
      * @return true if the message has the specified property
@@ -150,7 +150,8 @@ class ProducerConfiguration {
     std::map<std::string, std::string>& getProperties() const;
 
     /**
-     * Sets a new property on a message.
+     * Sets a new property on the producer
+     * .
      * @param name   the name of the property
      * @param value  the associated value
      */
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 7299867..fca47eb 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -152,6 +152,9 @@ int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_
 void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration,
                                         int compacted);
 
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index ae88198..670bf50 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -134,6 +134,9 @@ void pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod
 unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf);
 
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader() const;
 // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
 //
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index c8d5453..75cdc47 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -113,3 +113,8 @@ void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume
                                         int compacted) {
     consumer_configuration->consumerConfiguration.setReadCompacted(compacted);
 }
+
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+                                                const char *value) {
+    conf->consumerConfiguration.setProperty(name, value);
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 914fc0a..a8eb5be 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -174,3 +174,8 @@ unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf) {
     return conf->conf.getBatchingMaxPublishDelayMs();
 }
+
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+                                                const char *value) {
+    conf->conf.setProperty(name, value);
+}
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 7f613b2..1b41a71 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,18 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
+	if options.Properties != nil {
+		for key, value := range options.Properties {
+			cKey := C.CString(key)
+			cValue := C.CString(value)
+
+			C.pulsar_consumer_configuration_set_property(conf, cKey, cValue)
+
+			C.free(unsafe.Pointer(cKey))
+			C.free(unsafe.Pointer(cValue))
+		}
+	}
+
 	C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
 
 	subName := C.CString(options.SubscriptionName)
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index b4cd2c5..284315d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -124,6 +124,18 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 		C.pulsar_producer_configuration_set_batching_max_messages(conf, C.uint(options.BatchingMaxMessages))
 	}
 
+	if options.Properties != nil {
+		for key, value := range options.Properties {
+			cKey := C.CString(key)
+			cValue := C.CString(value)
+
+			C.pulsar_producer_configuration_set_property(conf, cKey, cValue)
+
+			C.free(unsafe.Pointer(cKey))
+			C.free(unsafe.Pointer(cValue))
+		}
+	}
+
 	topicName := C.CString(options.Topic)
 	defer C.free(unsafe.Pointer(topicName))
 
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index b9f2616..030ba1b 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -64,6 +64,10 @@ type ConsumerOptions struct {
 	// This argument is required when subscribing
 	SubscriptionName string
 
+	// Attach a set of application defined properties to the consumer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
 	// Set the timeout for unacked messages
 	// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
 	// Default is 0, which means message are not being replayed based on ack time
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 2cfd141..46d6dd6 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -71,6 +71,10 @@ type ProducerOptions struct {
 	// a topic.
 	Name string
 
+	// Attach a set of application defined properties to the producer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
 	// Set the send timeout (default: 30 seconds)
 	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
 	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 940be85..cfa0bcb 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-	"testing"
-	"fmt"
 	"context"
+	"fmt"
+	"testing"
 	"time"
 )
 
@@ -77,6 +77,10 @@ func TestProducer(t *testing.T) {
 		MaxPendingMessages:      100,
 		BlockIfQueueFull:        true,
 		CompressionType:         LZ4,
+		Properties: map[string]string{
+			"my-name": "test",
+			"key":     "value",
+		},
 	})
 
 	assertNil(t, err)