You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/06 18:11:19 UTC
[incubator-pulsar] 01/04: Add multi-topic and regex consumer in Go
client (#2448)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit e1dddcf21608b76f57e3d4830d45fbf87f8d2e99
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 27 16:34:00 2018 -0700
Add multi-topic and regex consumer in Go client (#2448)
---
pulsar-client-cpp/include/pulsar/c/client.h | 10 +++
pulsar-client-cpp/lib/c/c_Client.cc | 21 +++++
pulsar-client-go/pulsar/c_consumer.go | 38 +++++++--
pulsar-client-go/pulsar/c_go_pulsar.h | 13 +++
pulsar-client-go/pulsar/consumer.go | 10 ++-
pulsar-client-go/pulsar/consumer_test.go | 124 +++++++++++++++++++++++++++-
6 files changed, 207 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6d..4b603bb 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx);
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx);
+
/**
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
* topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e410..ecefe20 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
}
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx) {
+ std::vector<std::string> topicsList;
+ for (int i = 0; i < topicsCount; i++) {
+ topicsList.push_back(topics[i]);
+ }
+
+ client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration,
+ boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf,
+ pulsar_subscribe_callback callback, void *ctx) {
+ client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration,
+ boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
const pulsar_message_id_t *startMessageId,
pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index aebabc8..093dd9d 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
import "C"
import (
+ "context"
"runtime"
"time"
"unsafe"
- "context"
)
type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
}
func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
- if options.Topic == "" {
+ if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
return
}
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_consumer_name(conf, name)
}
- topic := C.CString(options.Topic)
subName := C.CString(options.SubscriptionName)
- defer C.free(unsafe.Pointer(topic))
defer C.free(unsafe.Pointer(subName))
- C._pulsar_client_subscribe_async(client.ptr, topic, subName,
- conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}))
+
+ callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
+
+ if options.Topic != "" {
+ topic := C.CString(options.Topic)
+ defer C.free(unsafe.Pointer(topic))
+ C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr)
+ } else if options.Topics != nil {
+ cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0))))
+
+ // convert the C array to a Go Array so we can index it
+ a := (*[1<<30 - 1]*C.char)(cArray)
+
+ for idx, topic := range options.Topics {
+ a[idx] = C.CString(topic)
+ }
+
+ C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
+ subName, conf, callbackPtr)
+
+ for idx, _ := range options.Topics {
+ C.free(unsafe.Pointer(a[idx]))
+ }
+
+ C.free(cArray)
+ } else if options.TopicsPattern != "" {
+ topicsPattern := C.CString(options.TopicsPattern)
+ defer C.free(unsafe.Pointer(topicsPattern))
+ C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr)
+ }
}
type consumerCallback struct {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h
index 045a4a2..8814276 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const
pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
}
+static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics,
+ int topicsCount, const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf, void *ctx) {
+ pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf,
+ pulsarSubscribeCallbackProxy, ctx);
+}
+
+static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
+ const char *subscriptionName,
+ const pulsar_consumer_configuration_t *conf, void *ctx) {
+ pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
+}
+
void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx);
static inline void _pulsar_consumer_configuration_set_message_listener(
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index 4ce0857..ed56d9e 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -49,9 +49,17 @@ const (
// ConsumerBuilder is used to configure and create instances of Consumer
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
- // This argument is required when subscribing
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string
+ // Specify a list of topics this consumer will subscribe on.
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
+ Topics []string
+
+ // Specify a regular expression to subscribe to multiple topics under the same namespace.
+ // Either a topic, a list of topics or a topics pattern are required when subscribing
+ TopicsPattern string
+
// Specify the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index 2930f19..75a454b 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -20,9 +20,9 @@
package pulsar
import (
- "testing"
- "fmt"
"context"
+ "fmt"
+ "testing"
"time"
)
@@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
}
+
+
+func TestConsumerMultiTopics(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ producer1, err := client.CreateProducer(ProducerOptions{
+ Topic: "multi-topic-1",
+ })
+
+ assertNil(t, err)
+
+ producer2, err := client.CreateProducer(ProducerOptions{
+ Topic: "multi-topic-2",
+ })
+
+ assertNil(t, err)
+ defer producer1.Close()
+ defer producer2.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topics: []string{"multi-topic-1", "multi-topic-2"},
+ SubscriptionName: "my-sub",
+ })
+
+ assertNil(t, err)
+ defer consumer.Close()
+
+ assertEqual(t, consumer.Subscription(), "my-sub")
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer1.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := producer2.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < 20; i++ {
+ msg, err := consumer.Receive(ctx)
+ assertNil(t, err)
+ assertNotNil(t, msg)
+
+ consumer.Ack(msg)
+ }
+
+ consumer.Unsubscribe()
+}
+
+
+func TestConsumerRegex(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: "pulsar://localhost:6650",
+ })
+
+ assertNil(t, err)
+ defer client.Close()
+
+ producer1, err := client.CreateProducer(ProducerOptions{
+ Topic: "topic-1",
+ })
+
+ assertNil(t, err)
+
+ producer2, err := client.CreateProducer(ProducerOptions{
+ Topic: "topic-2",
+ })
+
+ assertNil(t, err)
+ defer producer1.Close()
+ defer producer2.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ TopicsPattern: "topic-\\d+",
+ SubscriptionName: "my-sub",
+ })
+
+ assertNil(t, err)
+ defer consumer.Close()
+
+ assertEqual(t, consumer.Subscription(), "my-sub")
+
+ ctx := context.Background()
+
+ for i := 0; i < 10; i++ {
+ if err := producer1.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := producer2.Send(ctx, ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < 20; i++ {
+ msg, err := consumer.Receive(ctx)
+ assertNil(t, err)
+ assertNotNil(t, msg)
+
+ consumer.Ack(msg)
+ }
+
+ consumer.Unsubscribe()
+}
\ No newline at end of file