You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:18 UTC

[incubator-pulsar] branch branch-2.1 updated (6a02c94 -> 050d47d)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git.


    from 6a02c94  Downgrading ZK to stable version 3.4.13 (#2473)
     new e1dddcf  Add multi-topic and regex consumer in Go client (#2448)
     new 7486295  Support compaction options in Go client (#2449)
     new af1ff3a  Added producer/consumer properties in Go client (#2447)
     new 050d47d  [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized (#2527)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../include/pulsar/ProducerConfiguration.h         |   5 +-
 pulsar-client-cpp/include/pulsar/c/client.h        |  10 +
 .../include/pulsar/c/consumer_configuration.h      |   3 +
 .../include/pulsar/c/producer_configuration.h      |   3 +
 pulsar-client-cpp/lib/c/c_Client.cc                |  21 ++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |   5 +
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |   5 +
 pulsar-client-go/pulsar/c_consumer.go              |  55 ++++-
 pulsar-client-go/pulsar/c_go_pulsar.h              |  13 +
 pulsar-client-go/pulsar/c_producer.go              |  24 +-
 pulsar-client-go/pulsar/c_reader.go                |   5 +-
 pulsar-client-go/pulsar/consumer.go                |  26 +-
 pulsar-client-go/pulsar/consumer_test.go           | 265 ++++++++++++++++++++-
 pulsar-client-go/pulsar/producer.go                |   4 +
 pulsar-client-go/pulsar/producer_test.go           |   9 +-
 pulsar-client-go/pulsar/reader.go                  |   9 +
 pulsar-client-go/pulsar/reader_test.go             |  91 +++++++
 17 files changed, 530 insertions(+), 23 deletions(-)


[incubator-pulsar] 01/04: Add multi-topic and regex consumer in Go client (#2448)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit e1dddcf21608b76f57e3d4830d45fbf87f8d2e99
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 27 16:34:00 2018 -0700

    Add multi-topic and regex consumer in Go client (#2448)
---
 pulsar-client-cpp/include/pulsar/c/client.h |  10 +++
 pulsar-client-cpp/lib/c/c_Client.cc         |  21 +++++
 pulsar-client-go/pulsar/c_consumer.go       |  38 +++++++--
 pulsar-client-go/pulsar/c_go_pulsar.h       |  13 +++
 pulsar-client-go/pulsar/consumer.go         |  10 ++-
 pulsar-client-go/pulsar/consumer_test.go    | 124 +++++++++++++++++++++++++++-
 6 files changed, 207 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6d..4b603bb 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
                                    const pulsar_consumer_configuration_t *conf,
                                    pulsar_subscribe_callback callback, void *ctx);
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                           const char *subscriptionName,
+                                           const pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, void *ctx);
+
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
  * topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e410..ecefe20 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
                                    boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
 }
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback callback, void *ctx) {
+    std::vector<std::string> topicsList;
+    for (int i = 0; i < topicsCount; i++) {
+        topicsList.push_back(topics[i]);
+    }
+
+    client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                           const char *subscriptionName,
+                                           const pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, void *ctx) {
+    client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration,
+                                            boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
                                           const pulsar_message_id_t *startMessageId,
                                           pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index aebabc8..093dd9d 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
 import "C"
 
 import (
+	"context"
 	"runtime"
 	"time"
 	"unsafe"
-	"context"
 )
 
 type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
 }
 
 func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
-	if options.Topic == "" {
+	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
 		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
 		return
 	}
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
-	topic := C.CString(options.Topic)
 	subName := C.CString(options.SubscriptionName)
-	defer C.free(unsafe.Pointer(topic))
 	defer C.free(unsafe.Pointer(subName))
-	C._pulsar_client_subscribe_async(client.ptr, topic, subName,
-		conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}))
+
+	callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
+
+	if options.Topic != "" {
+		topic := C.CString(options.Topic)
+		defer C.free(unsafe.Pointer(topic))
+		C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr)
+	} else if options.Topics != nil {
+		cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0))))
+
+		// convert the C array to a Go Array so we can index it
+		a := (*[1<<30 - 1]*C.char)(cArray)
+
+		for idx, topic := range options.Topics {
+			a[idx] = C.CString(topic)
+		}
+
+		C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
+			subName, conf, callbackPtr)
+
+		for idx, _ := range options.Topics {
+			C.free(unsafe.Pointer(a[idx]))
+		}
+
+		C.free(cArray)
+	} else if options.TopicsPattern != "" {
+		topicsPattern := C.CString(options.TopicsPattern)
+		defer C.free(unsafe.Pointer(topicsPattern))
+		C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr)
+	}
 }
 
 type consumerCallback struct {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h
index 045a4a2..8814276 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const
     pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
 }
 
+static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics,
+                                                  int topicsCount,  const char *subscriptionName,
+                                                  const pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf,
+                                               pulsarSubscribeCallbackProxy, ctx);
+}
+
+static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                                  const char *subscriptionName,
+                                                  const pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
+}
+
 void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx);
 
 static inline void _pulsar_consumer_configuration_set_message_listener(
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index 4ce0857..ed56d9e 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -49,9 +49,17 @@ const (
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
 	// Specify the topic this consumer will subscribe on.
-	// This argument is required when subscribing
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
 	Topic string
 
+	// Specify a list of topics this consumer will subscribe on.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	Topics []string
+
+	// Specify a regular expression to subscribe to multiple topics under the same namespace.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	TopicsPattern string
+
 	// Specify the subscription name for this consumer
 	// This argument is required when subscribing
 	SubscriptionName string
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 2930f19..75a454b 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-	"testing"
-	"fmt"
 	"context"
+	"fmt"
+	"testing"
 	"time"
 )
 
@@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestConsumerMultiTopics(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic: "multi-topic-1",
+	})
+
+	assertNil(t, err)
+
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic: "multi-topic-2",
+	})
+
+	assertNil(t, err)
+	defer producer1.Close()
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topics:           []string{"multi-topic-1", "multi-topic-2"},
+		SubscriptionName: "my-sub",
+	})
+
+	assertNil(t, err)
+	defer consumer.Close()
+
+	assertEqual(t, consumer.Subscription(), "my-sub")
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer1.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		if err := producer2.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < 20; i++ {
+		msg, err := consumer.Receive(ctx)
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		consumer.Ack(msg)
+	}
+
+	consumer.Unsubscribe()
+}
+
+
+func TestConsumerRegex(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic: "topic-1",
+	})
+
+	assertNil(t, err)
+
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic: "topic-2",
+	})
+
+	assertNil(t, err)
+	defer producer1.Close()
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		TopicsPattern: "topic-\\d+",
+		SubscriptionName: "my-sub",
+	})
+
+	assertNil(t, err)
+	defer consumer.Close()
+
+	assertEqual(t, consumer.Subscription(), "my-sub")
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer1.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		if err := producer2.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < 20; i++ {
+		msg, err := consumer.Receive(ctx)
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		consumer.Ack(msg)
+	}
+
+	consumer.Unsubscribe()
+}
\ No newline at end of file


[incubator-pulsar] 03/04: Added producer/consumer properties in Go client (#2447)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit af1ff3a789bec223b102bc7e27e33cf7bdc17104
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 31 15:39:11 2018 -0700

    Added producer/consumer properties in Go client (#2447)
---
 pulsar-client-cpp/include/pulsar/ProducerConfiguration.h    |  5 +++--
 pulsar-client-cpp/include/pulsar/c/consumer_configuration.h |  3 +++
 pulsar-client-cpp/include/pulsar/c/producer_configuration.h |  3 +++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc          |  5 +++++
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc          |  5 +++++
 pulsar-client-go/pulsar/c_consumer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/c_producer.go                       | 12 ++++++++++++
 pulsar-client-go/pulsar/consumer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer.go                         |  4 ++++
 pulsar-client-go/pulsar/producer_test.go                    |  8 ++++++--
 10 files changed, 57 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 45154c5..565a6ab 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -128,7 +128,7 @@ class ProducerConfiguration {
     ProducerConfiguration& addEncryptionKey(std::string key);
 
     /**
-     * Check whether the message has a specific property attached.
+     * Check whether the producer has a specific property attached.
      *
      * @param name the name of the property to check
      * @return true if the message has the specified property
@@ -150,7 +150,8 @@ class ProducerConfiguration {
     std::map<std::string, std::string>& getProperties() const;
 
     /**
-     * Sets a new property on a message.
+     * Sets a new property on the producer
+     * .
      * @param name   the name of the property
      * @param value  the associated value
      */
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 7299867..fca47eb 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -152,6 +152,9 @@ int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_
 void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration,
                                         int compacted);
 
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index ae88198..670bf50 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -134,6 +134,9 @@ void pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod
 unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf);
 
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+                                                const char *value);
+
 // const CryptoKeyReaderPtr getCryptoKeyReader() const;
 // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
 //
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index c8d5453..75cdc47 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -113,3 +113,8 @@ void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume
                                         int compacted) {
     consumer_configuration->consumerConfiguration.setReadCompacted(compacted);
 }
+
+void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name,
+                                                const char *value) {
+    conf->consumerConfiguration.setProperty(name, value);
+}
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 914fc0a..a8eb5be 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -174,3 +174,8 @@ unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms(
     pulsar_producer_configuration_t *conf) {
     return conf->conf.getBatchingMaxPublishDelayMs();
 }
+
+void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name,
+                                                const char *value) {
+    conf->conf.setProperty(name, value);
+}
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 7f613b2..1b41a71 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,18 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
+	if options.Properties != nil {
+		for key, value := range options.Properties {
+			cKey := C.CString(key)
+			cValue := C.CString(value)
+
+			C.pulsar_consumer_configuration_set_property(conf, cKey, cValue)
+
+			C.free(unsafe.Pointer(cKey))
+			C.free(unsafe.Pointer(cValue))
+		}
+	}
+
 	C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
 
 	subName := C.CString(options.SubscriptionName)
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index b4cd2c5..284315d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -124,6 +124,18 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 		C.pulsar_producer_configuration_set_batching_max_messages(conf, C.uint(options.BatchingMaxMessages))
 	}
 
+	if options.Properties != nil {
+		for key, value := range options.Properties {
+			cKey := C.CString(key)
+			cValue := C.CString(value)
+
+			C.pulsar_producer_configuration_set_property(conf, cKey, cValue)
+
+			C.free(unsafe.Pointer(cKey))
+			C.free(unsafe.Pointer(cValue))
+		}
+	}
+
 	topicName := C.CString(options.Topic)
 	defer C.free(unsafe.Pointer(topicName))
 
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index b9f2616..030ba1b 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -64,6 +64,10 @@ type ConsumerOptions struct {
 	// This argument is required when subscribing
 	SubscriptionName string
 
+	// Attach a set of application defined properties to the consumer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
 	// Set the timeout for unacked messages
 	// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
 	// Default is 0, which means message are not being replayed based on ack time
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 2cfd141..46d6dd6 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -71,6 +71,10 @@ type ProducerOptions struct {
 	// a topic.
 	Name string
 
+	// Attach a set of application defined properties to the producer
+	// This properties will be visible in the topic stats
+	Properties map[string]string
+
 	// Set the send timeout (default: 30 seconds)
 	// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
 	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 940be85..cfa0bcb 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-	"testing"
-	"fmt"
 	"context"
+	"fmt"
+	"testing"
 	"time"
 )
 
@@ -77,6 +77,10 @@ func TestProducer(t *testing.T) {
 		MaxPendingMessages:      100,
 		BlockIfQueueFull:        true,
 		CompressionType:         LZ4,
+		Properties: map[string]string{
+			"my-name": "test",
+			"key":     "value",
+		},
 	})
 
 	assertNil(t, err)


[incubator-pulsar] 02/04: Support compaction options in Go client (#2449)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 74862956221c7353df551dbe17defe39860d2c61
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Aug 28 06:42:42 2018 -0700

    Support compaction options in Go client (#2449)
    
    Added the option for ReadCompacted in Go based consumer/reader configuration.
---
 pulsar-client-go/pulsar/c_consumer.go    |   2 +
 pulsar-client-go/pulsar/c_reader.go      |   2 +
 pulsar-client-go/pulsar/consumer.go      |  12 ++-
 pulsar-client-go/pulsar/consumer_test.go | 141 ++++++++++++++++++++++++++++++-
 pulsar-client-go/pulsar/producer_test.go |   1 -
 pulsar-client-go/pulsar/reader.go        |   9 ++
 pulsar-client-go/pulsar/reader_test.go   |  91 ++++++++++++++++++++
 7 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 093dd9d..7f613b2 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,8 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
+	C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
+
 	subName := C.CString(options.SubscriptionName)
 	defer C.free(unsafe.Pointer(subName))
 
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 12c1103..04bb5cf 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -99,6 +99,8 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 		C.pulsar_reader_configuration_set_subscription_role_prefix(conf, prefix)
 	}
 
+	C.pulsar_reader_configuration_set_read_compacted(conf, cBool(options.ReadCompacted))
+
 	if options.Name != "" {
 		name := C.CString(options.Name)
 		defer C.free(unsafe.Pointer(name))
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index ed56d9e..b9f2616 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -20,8 +20,8 @@
 package pulsar
 
 import (
-	"time"
 	"context"
+	"time"
 )
 
 // Pair of a Consumer and Message
@@ -92,6 +92,16 @@ type ConsumerOptions struct {
 
 	// Set the consumer name.
 	Name string
+
+	// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
+	ReadCompacted bool
 }
 
 // An interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 75a454b..f81ce56 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -22,6 +22,9 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strings"
 	"testing"
 	"time"
 )
@@ -99,6 +102,111 @@ func TestConsumer(t *testing.T) {
 	consumer.Unsubscribe()
 }
 
+func TestConsumerCompaction(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assertNil(t, err)
+	defer producer.Close()
+
+	// Pre-create both subscriptions to retain published messages
+	consumer1, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-1",
+	})
+
+	assertNil(t, err)
+	consumer1.Close()
+
+	consumer2, err := client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-2",
+		ReadCompacted:    true,
+	})
+
+	assertNil(t, err)
+	consumer2.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "Same-Key",
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Compact topic and wait for operation to complete
+	url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+	makeHttpPutCall(t, url)
+	for {
+		res := makeHttpGetCall(t, url)
+		if strings.Contains(res, "RUNNING") {
+			fmt.Println("Compaction still running")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		} else {
+			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			fmt.Println("Compaction is done")
+			break
+		}
+	}
+
+	// Restart the consumers
+
+	consumer1, err = client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-1",
+	})
+
+	assertNil(t, err)
+	defer consumer1.Close()
+
+	consumer2, err = client.Subscribe(ConsumerOptions{
+		Topic:            topic,
+		SubscriptionName: "my-sub-2",
+		ReadCompacted:    true,
+	})
+
+	assertNil(t, err)
+	defer consumer2.Close()
+
+	// Consumer-1 will receive all messages
+	for i := 0; i < 10; i++ {
+		msg, err := consumer1.Receive(context.Background())
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+	}
+
+	// Consumer-2 will only receive the last message
+	msg, err := consumer2.Receive(context.Background())
+	assertNil(t, err)
+	assertNotNil(t, msg)
+	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+	// No more messages on consumer-2
+	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	defer cancel()
+
+	msg, err = consumer2.Receive(ctx)
+	assertNil(t, msg)
+	assertNotNil(t, err)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
@@ -125,13 +233,44 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 		SubscriptionName: "my-subscription",
 	})
 
-	// Expect error in creating cosnumer
+	// Expect error in creating consumer
 	assertNil(t, consumer)
 	assertNotNil(t, err)
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
+func makeHttpPutCall(t *testing.T, url string) string {
+	return makeHttpCall(t, http.MethodPut, url)
+}
+
+func makeHttpGetCall(t *testing.T, url string) string {
+	return makeHttpCall(t, http.MethodGet, url)
+}
+
+func makeHttpCall(t *testing.T, method string, url string) string {
+	client := http.Client{}
+
+	req, err := http.NewRequest(method, url, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Accept", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	body, err := ioutil.ReadAll(res.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	return string(body)
+}
 
 func TestConsumerMultiTopics(t *testing.T) {
 	client, err := NewClient(ClientOptions{
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index d7748f7..940be85 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -62,7 +62,6 @@ func TestProducer(t *testing.T) {
 		OperationTimeoutSeconds:  30,
 		ConcurrentLookupRequests: 1000,
 		MessageListenerThreads:   5,
-		EnableTLS:                false,
 	})
 
 	assertNil(t, err)
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index 7015c9c..5592630 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -57,6 +57,15 @@ type ReaderOptions struct {
 
 	// Set the subscription role prefix. The default prefix is "reader".
 	SubscriptionRolePrefix string
+
+	// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
+	// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
+	// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+	// point, the messages will be sent as normal.
+	//
+	// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
+	// topics will lead to the reader create call throwing a PulsarClientException.
+	ReadCompacted bool
 }
 
 // A Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index 3b075e1..a0a63ae 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -22,7 +22,9 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"strings"
 	"testing"
+	"time"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -128,3 +130,92 @@ func TestReaderWithInvalidConf(t *testing.T) {
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestReaderCompaction(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix())
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: topic,
+	})
+
+	assertNil(t, err)
+	defer producer.Close()
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+			Key:     "Same-Key",
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	// Compact topic and wait for operation to complete
+	url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic)
+	makeHttpPutCall(t, url)
+	for {
+		res := makeHttpGetCall(t, url)
+		if strings.Contains(res, "RUNNING") {
+			fmt.Println("Compaction still running")
+			time.Sleep(100 * time.Millisecond)
+			continue
+		} else {
+			assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+			fmt.Println("Compaction is done")
+			break
+		}
+	}
+
+	// Restart the consumers
+
+	reader1, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessage,
+	})
+
+	assertNil(t, err)
+	defer reader1.Close()
+
+	reader2, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessage,
+		ReadCompacted:  true,
+	})
+
+	assertNil(t, err)
+	defer reader2.Close()
+
+	// Reader-1 will receive all messages
+	for i := 0; i < 10; i++ {
+		msg, err := reader1.Next(context.Background())
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
+	}
+
+	// Reader-2 will only receive the last message
+	msg, err := reader2.Next(context.Background())
+	assertNil(t, err)
+	assertNotNil(t, msg)
+	assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+	// No more messages on consumer-2
+	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+	defer cancel()
+
+	msg, err = reader2.Next(ctx)
+	assertNil(t, msg)
+	assertNotNil(t, err)
+}
+


[incubator-pulsar] 04/04: [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized (#2527)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 050d47dcb1e3138d093941f9a00641c7caf7c3fd
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Sep 6 07:40:24 2018 -0700

    [go] Ensure producer/consumer/reader keep a ref of client instance so it won't be finalized (#2527)
---
 pulsar-client-go/pulsar/c_consumer.go |  3 ++-
 pulsar-client-go/pulsar/c_producer.go | 12 +++++++-----
 pulsar-client-go/pulsar/c_reader.go   |  3 ++-
 3 files changed, 11 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 1b41a71..c78a58e 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+	client         *client
 	ptr            *C.pulsar_consumer_t
 	defaultChannel chan ConsumerMessage
 }
@@ -76,7 +77,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 
 	conf := C.pulsar_consumer_configuration_create()
 
-	consumer := &consumer{}
+	consumer := &consumer{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 284315d..620b64d 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -24,13 +24,14 @@ package pulsar
 */
 import "C"
 import (
+	"context"
 	"runtime"
-	"unsafe"
 	"time"
-	"context"
+	"unsafe"
 )
 
 type createProducerCtx struct {
+	client   *client
 	callback func(producer Producer, err error)
 	conf     *C.pulsar_producer_configuration_t
 }
@@ -44,7 +45,7 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
 	if res != C.pulsar_result_Ok {
 		producerCtx.callback(nil, newError(res, "Failed to create Producer"))
 	} else {
-		p := &producer{ptr: ptr}
+		p := &producer{client: producerCtx.client, ptr: ptr}
 		runtime.SetFinalizer(p, producerFinalizer)
 		producerCtx.callback(p, nil)
 	}
@@ -140,7 +141,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 	defer C.free(unsafe.Pointer(topicName))
 
 	C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-		savePointer(createProducerCtx{callback, conf}))
+		savePointer(createProducerCtx{client,callback, conf}))
 }
 
 type topicMetadata struct {
@@ -161,7 +162,8 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
 /// Producer
 
 type producer struct {
-	ptr *C.pulsar_producer_t
+	client *client
+	ptr    *C.pulsar_producer_t
 }
 
 func producerFinalizer(p *producer) {
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 04bb5cf..7336c1a 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+	client         *client
 	ptr            *C.pulsar_reader_t
 	defaultChannel chan ReaderMessage
 }
@@ -73,7 +74,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 		return
 	}
 
-	reader := &reader{}
+	reader := &reader{client: client}
 
 	if options.MessageChannel == nil {
 		// If there is no message listener, set a default channel so that we can have receive to