You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/04/25 13:39:58 UTC
[pulsar-client-go] branch master updated: Add flag to disable
forced topic creation. (#226)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6edc8f4 Add flag to disable forced topic creation. (#226)
6edc8f4 is described below
commit 6edc8f4ef954477eb36eed92e88464f56b4c48ee
Author: cckellogg <cc...@gmail.com>
AuthorDate: Sat Apr 25 06:39:51 2020 -0700
Add flag to disable forced topic creation. (#226)
This flag is needed for the regex consumer.
---
pulsar/consumer_impl.go | 32 +++++++++++++++++---------------
pulsar/consumer_partition.go | 7 +++++++
pulsar/consumer_regex.go | 2 +-
3 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index ba7d24d..3a27def 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -42,11 +42,12 @@ type acker interface {
type consumer struct {
sync.Mutex
- topic string
- client *client
- options ConsumerOptions
- consumers []*partitionConsumer
- consumerName string
+ topic string
+ client *client
+ options ConsumerOptions
+ consumers []*partitionConsumer
+ consumerName string
+ disableForceTopicCreation bool
// channel used to deliver message to clients
messageCh chan ConsumerMessage
@@ -123,17 +124,18 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
- messageCh chan ConsumerMessage, dlq *dlqRouter) (*consumer, error) {
+ messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
consumer := &consumer{
- topic: topic,
- client: client,
- options: options,
- messageCh: messageCh,
- closeCh: make(chan struct{}),
- errorCh: make(chan error),
- dlq: dlq,
- log: log.WithField("topic", topic),
+ topic: topic,
+ client: client,
+ options: options,
+ disableForceTopicCreation: disableForceTopicCreation,
+ messageCh: messageCh,
+ closeCh: make(chan struct{}),
+ errorCh: make(chan error),
+ dlq: dlq,
+ log: log.WithField("topic", topic),
}
if options.Name != "" {
@@ -275,7 +277,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
- return newInternalConsumer(client, options, topic, messageCh, dlqRouter)
+ return newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
}
func (c *consumer) Subscription() string {
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 33fc605..498f2ae 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -76,6 +76,7 @@ type partitionConsumerOpts struct {
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
+ disableForceTopicCreation bool
}
type partitionConsumer struct {
@@ -748,6 +749,12 @@ func (pc *partitionConsumer) grabConn() error {
cmdSubscribe.Metadata = toKeyValues(pc.options.metadata)
}
+ // force topic creation is enabled by default so
+ // we only need to set the flag when disabling it
+ if pc.options.disableForceTopicCreation {
+ cmdSubscribe.ForceTopicCreation = proto.Bool(false)
+ }
+
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 00cbb88..3d0aebe 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -361,7 +361,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
- c, err := newInternalConsumer(c, opts, topic, ch, dlq)
+ c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
consumerErrorCh <- consumerError{
err: err,
topic: topic,