You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:19 UTC

[incubator-pulsar] 01/04: Add multi-topic and regex consumer in Go client (#2448)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit e1dddcf21608b76f57e3d4830d45fbf87f8d2e99
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 27 16:34:00 2018 -0700

    Add multi-topic and regex consumer in Go client (#2448)
---
 pulsar-client-cpp/include/pulsar/c/client.h |  10 +++
 pulsar-client-cpp/lib/c/c_Client.cc         |  21 +++++
 pulsar-client-go/pulsar/c_consumer.go       |  38 +++++++--
 pulsar-client-go/pulsar/c_go_pulsar.h       |  13 +++
 pulsar-client-go/pulsar/consumer.go         |  10 ++-
 pulsar-client-go/pulsar/consumer_test.go    | 124 +++++++++++++++++++++++++++-
 6 files changed, 207 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6d..4b603bb 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
                                    const pulsar_consumer_configuration_t *conf,
                                    pulsar_subscribe_callback callback, void *ctx);
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                           const char *subscriptionName,
+                                           const pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, void *ctx);
+
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
  * topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e410..ecefe20 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
                                    boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
 }
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback callback, void *ctx) {
+    std::vector<std::string> topicsList;
+    for (int i = 0; i < topicsCount; i++) {
+        topicsList.push_back(topics[i]);
+    }
+
+    client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                           const char *subscriptionName,
+                                           const pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, void *ctx) {
+    client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration,
+                                            boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
                                           const pulsar_message_id_t *startMessageId,
                                           pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index aebabc8..093dd9d 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
 import "C"
 
 import (
+	"context"
 	"runtime"
 	"time"
 	"unsafe"
-	"context"
 )
 
 type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
 }
 
 func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
-	if options.Topic == "" {
+	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
 		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
 		return
 	}
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_consumer_name(conf, name)
 	}
 
-	topic := C.CString(options.Topic)
 	subName := C.CString(options.SubscriptionName)
-	defer C.free(unsafe.Pointer(topic))
 	defer C.free(unsafe.Pointer(subName))
-	C._pulsar_client_subscribe_async(client.ptr, topic, subName,
-		conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}))
+
+	callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
+
+	if options.Topic != "" {
+		topic := C.CString(options.Topic)
+		defer C.free(unsafe.Pointer(topic))
+		C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr)
+	} else if options.Topics != nil {
+		cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0))))
+
+		// convert the C array to a Go Array so we can index it
+		a := (*[1<<30 - 1]*C.char)(cArray)
+
+		for idx, topic := range options.Topics {
+			a[idx] = C.CString(topic)
+		}
+
+		C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
+			subName, conf, callbackPtr)
+
+		for idx, _ := range options.Topics {
+			C.free(unsafe.Pointer(a[idx]))
+		}
+
+		C.free(cArray)
+	} else if options.TopicsPattern != "" {
+		topicsPattern := C.CString(options.TopicsPattern)
+		defer C.free(unsafe.Pointer(topicsPattern))
+		C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr)
+	}
 }
 
 type consumerCallback struct {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h
index 045a4a2..8814276 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const
     pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
 }
 
+static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics,
+                                                  int topicsCount,  const char *subscriptionName,
+                                                  const pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf,
+                                               pulsarSubscribeCallbackProxy, ctx);
+}
+
+static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+                                                  const char *subscriptionName,
+                                                  const pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
+}
+
 void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx);
 
 static inline void _pulsar_consumer_configuration_set_message_listener(
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index 4ce0857..ed56d9e 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -49,9 +49,17 @@ const (
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
 	// Specify the topic this consumer will subscribe on.
-	// This argument is required when subscribing
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
 	Topic string
 
+	// Specify a list of topics this consumer will subscribe on.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	Topics []string
+
+	// Specify a regular expression to subscribe to multiple topics under the same namespace.
+	// Either a topic, a list of topics or a topics pattern are required when subscribing
+	TopicsPattern string
+
 	// Specify the subscription name for this consumer
 	// This argument is required when subscribing
 	SubscriptionName string
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 2930f19..75a454b 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-	"testing"
-	"fmt"
 	"context"
+	"fmt"
+	"testing"
 	"time"
 )
 
@@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 
 	assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestConsumerMultiTopics(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic: "multi-topic-1",
+	})
+
+	assertNil(t, err)
+
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic: "multi-topic-2",
+	})
+
+	assertNil(t, err)
+	defer producer1.Close()
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		Topics:           []string{"multi-topic-1", "multi-topic-2"},
+		SubscriptionName: "my-sub",
+	})
+
+	assertNil(t, err)
+	defer consumer.Close()
+
+	assertEqual(t, consumer.Subscription(), "my-sub")
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer1.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		if err := producer2.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < 20; i++ {
+		msg, err := consumer.Receive(ctx)
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		consumer.Ack(msg)
+	}
+
+	consumer.Unsubscribe()
+}
+
+
+func TestConsumerRegex(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assertNil(t, err)
+	defer client.Close()
+
+	producer1, err := client.CreateProducer(ProducerOptions{
+		Topic: "topic-1",
+	})
+
+	assertNil(t, err)
+
+	producer2, err := client.CreateProducer(ProducerOptions{
+		Topic: "topic-2",
+	})
+
+	assertNil(t, err)
+	defer producer1.Close()
+	defer producer2.Close()
+
+	consumer, err := client.Subscribe(ConsumerOptions{
+		TopicsPattern: "topic-\\d+",
+		SubscriptionName: "my-sub",
+	})
+
+	assertNil(t, err)
+	defer consumer.Close()
+
+	assertEqual(t, consumer.Subscription(), "my-sub")
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer1.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+
+		if err := producer2.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < 20; i++ {
+		msg, err := consumer.Receive(ctx)
+		assertNil(t, err)
+		assertNotNil(t, msg)
+
+		consumer.Ack(msg)
+	}
+
+	consumer.Unsubscribe()
+}
\ No newline at end of file