You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/11/11 21:57:55 UTC
[pulsar-client-go] branch master updated: Simplify and refactor
parts of the single topic consumer. (#86)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 39d7cdc Simplify and refactor parts of the single topic consumer. (#86)
39d7cdc is described below
commit 39d7cdc46ac49299148f293ae9d5fa154d29b8ad
Author: cckellogg <cc...@gmail.com>
AuthorDate: Mon Nov 11 13:57:45 2019 -0800
Simplify and refactor parts of the single topic consumer. (#86)
* Simplify and refactor parts of the single topic consumer.
* Wait when closing a consumer and change get chan method.
---
pulsar/consumer.go | 49 +--
pulsar/consumer_impl.go | 278 ++++++++++++++
pulsar/consumer_partition.go | 510 ++++++++++++++++++++++++++
pulsar/consumer_test.go | 442 ++++-------------------
util/error.go => pulsar/helper.go | 12 +-
pulsar/impl_client.go | 9 +-
pulsar/impl_consumer.go | 316 ----------------
pulsar/impl_partition_consumer.go | 739 --------------------------------------
pulsar/internal/connection.go | 19 +-
pulsar/internal/rpc_client.go | 3 +
pulsar/test_helper.go | 20 ++
pulsar/unacked_msg_tracker.go | 2 +-
util/util.go | 44 ---
util/util_test.go | 41 ---
14 files changed, 914 insertions(+), 1570 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c259cd6..3b729ec 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -35,11 +35,13 @@ const (
// Exclusive there can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
- // Shared subscription mode, multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
+ // Shared subscription mode, multiple consumer will be able to use the same subscription name
+ // and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
- // Failover subscription mode, multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
+ // Failover subscription mode, multiple consumer will be able to use the same subscription name
+ // but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
@@ -48,14 +50,14 @@ const (
KeyShared
)
-type InitialPosition int
+type SubscriptionInitialPosition int
const (
// Latest position which means the start consuming position will be the last message
- Latest InitialPosition = iota
+ SubscriptionPositionLatest SubscriptionInitialPosition = iota
// Earliest position which means the start consuming position will be the first message
- Earliest
+ SubscriptionPositionEarliest
)
// ConsumerOptions is used to configure and create instances of Consumer
@@ -91,7 +93,7 @@ type ConsumerOptions struct {
// InitialPosition at which the cursor will be set when subscribe
// Default is `Latest`
- SubscriptionInitPos InitialPosition
+ SubscriptionInitialPosition
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
@@ -139,11 +141,8 @@ type Consumer interface {
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
- // ReceiveAsync appends the message to the msgs channel asynchronously.
- ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
-
- // ReceiveAsyncWithCallback returns a callback containing the message and error objects
- ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error))
+ // Chan returns a channel to consume messages from
+ Chan() <-chan ConsumerMessage
// Ack the consumption of a single message
Ack(Message) error
@@ -151,34 +150,6 @@ type Consumer interface {
// AckID the consumption of a single message, identified by its MessageID
AckID(MessageID) error
- // AckCumulative the reception of all the messages in the stream up to (and including) the provided message.
- // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
- // re-delivered to this consumer.
- //
- // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
- //
- // It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
- AckCumulative(Message) error
-
- // AckCumulativeID the reception of all the messages in the stream up to (and including) the provided message.
- // This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
- // re-delivered to this consumer.
- // Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
- // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
- AckCumulativeID(MessageID) error
-
// Close the consumer and stop the broker to push more messages
Close() error
-
- // Seek reset the subscription associated with this consumer to a specific message id.
- // The message id can either be a specific message or represent the first or last messages in the topic.
- // Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
- // seek() on the individual partitions.
- Seek(msgID MessageID) error
-
- // RedeliverUnackedMessages redeliver all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
- // active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
- // the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
- // breaks, the messages are redelivered after reconnect.
- RedeliverUnackedMessages() error
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
new file mode 100644
index 0000000..af9b8d5
--- /dev/null
+++ b/pulsar/consumer_impl.go
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "sync"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+var ErrConsumerClosed = errors.New("consumer closed")
+
+type consumer struct {
+ topic string
+
+ options ConsumerOptions
+
+ consumers []*partitionConsumer
+
+ // channel used to deliver message to clients
+ messageCh chan ConsumerMessage
+
+ closeCh chan struct{}
+ errorCh chan error
+
+ log *log.Entry
+}
+
+func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
+ if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
+ return nil, newError(TopicNotFound, "topic is required")
+ }
+
+ if options.SubscriptionName == "" {
+ return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
+ }
+
+ if options.ReceiverQueueSize == 0 {
+ options.ReceiverQueueSize = 1000
+ }
+
+ // did the user pass in a message channel?
+ messageCh := options.MessageChannel
+ if options.MessageChannel == nil {
+ messageCh = make(chan ConsumerMessage, 10)
+ }
+
+ // single topic consumer
+ if options.Topic != "" || len(options.Topics) == 1 {
+ topic := options.Topic
+ if topic == "" {
+ topic = options.Topics[0]
+ }
+
+ if _, err := internal.ParseTopicName(topic); err != nil {
+ return nil, err
+ }
+
+ return topicSubscribe(client, options, topic, messageCh)
+ }
+
+ return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
+}
+
+func topicSubscribe(client *client, options ConsumerOptions, topic string,
+ messageCh chan ConsumerMessage) (Consumer, error) {
+ consumer := &consumer{
+ topic: topic,
+ messageCh: messageCh,
+ errorCh: make(chan error),
+ log: log.WithField("topic", topic),
+ }
+
+ partitions, err := client.TopicPartitions(topic)
+ if err != nil {
+ return nil, err
+ }
+
+ numPartitions := len(partitions)
+ consumer.consumers = make([]*partitionConsumer, numPartitions)
+
+ type ConsumerError struct {
+ err error
+ partition int
+ consumer *partitionConsumer
+ }
+
+ consumerName := options.Name
+ if consumerName == "" {
+ consumerName = generateRandomName()
+ }
+
+ receiverQueueSize := options.ReceiverQueueSize
+ var wg sync.WaitGroup
+ ch := make(chan ConsumerError, numPartitions)
+ for partitionIdx, partitionTopic := range partitions {
+ wg.Add(1)
+ go func(idx int, pt string) {
+ defer wg.Done()
+ opts := &partitionConsumerOpts{
+ topic: pt,
+ consumerName: consumerName,
+ subscription: options.SubscriptionName,
+ subscriptionType: options.Type,
+ subscriptionInitPos: options.SubscriptionInitialPosition,
+ partitionIdx: idx,
+ receiverQueueSize: receiverQueueSize,
+ }
+ cons, err := newPartitionConsumer(consumer, client, opts, messageCh)
+ ch <- ConsumerError{
+ err: err,
+ partition: idx,
+ consumer: cons,
+ }
+ }(partitionIdx, partitionTopic)
+ }
+
+ go func() {
+ wg.Wait()
+ close(ch)
+ }()
+
+ for ce := range ch {
+ if ce.err != nil {
+ err = ce.err
+ } else {
+ consumer.consumers[ce.partition] = ce.consumer
+ }
+ }
+
+ if err != nil {
+ // Since there were some failures,
+ // cleanup all the partitions that succeeded in creating the consumer
+ for _, c := range consumer.consumers {
+ if c != nil {
+ _ = c.Close()
+ }
+ }
+ return nil, err
+ }
+
+ return consumer, nil
+}
+
+func (c *consumer) Topic() string {
+ return c.topic
+}
+
+func (c *consumer) Subscription() string {
+ return c.options.SubscriptionName
+}
+
+func (c *consumer) Unsubscribe() error {
+ var errMsg string
+ for _, consumer := range c.consumers {
+ if err := consumer.Unsubscribe(); err != nil {
+ errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
+ }
+ }
+ if errMsg != "" {
+ return fmt.Errorf(errMsg)
+ }
+ return nil
+}
+
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+ for {
+ select {
+ case cm, ok := <-c.messageCh:
+ if !ok {
+ return nil, ErrConsumerClosed
+ }
+ return cm.Message, nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+}
+
+// Messages
+func (c *consumer) Chan() <-chan ConsumerMessage {
+ return c.messageCh
+}
+
+// Ack the consumption of a single message
+func (c *consumer) Ack(msg Message) error {
+ return c.AckID(msg.ID())
+}
+
+// Ack the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
+ mid, ok := msgID.(*messageID)
+ if !ok {
+ return fmt.Errorf("invalid message id type")
+ }
+
+ partition := mid.partitionIdx
+ // did we receive a valid partition index?
+ if partition < 0 || partition >= len(c.consumers) {
+ return fmt.Errorf("invalid partition index %d expected a partition between [0-%d]",
+ partition, len(c.consumers))
+ }
+ return c.consumers[partition].AckID(msgID)
+}
+
+func (c *consumer) Close() error {
+ var wg sync.WaitGroup
+ for i := range c.consumers {
+ wg.Add(1)
+ go func(pc *partitionConsumer) {
+ defer wg.Done()
+ pc.Close()
+ }(c.consumers[i])
+ }
+ wg.Wait()
+
+ return nil
+}
+
+var random = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func generateRandomName() string {
+ chars := "abcdefghijklmnopqrstuvwxyz"
+ bytes := make([]byte, 5)
+ for i := range bytes {
+ bytes[i] = chars[random.Intn(len(chars))]
+ }
+ return string(bytes)
+}
+
+func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
+ switch st {
+ case Exclusive:
+ return pb.CommandSubscribe_Exclusive
+ case Shared:
+ return pb.CommandSubscribe_Shared
+ case Failover:
+ return pb.CommandSubscribe_Failover
+ case KeyShared:
+ return pb.CommandSubscribe_Key_Shared
+ }
+
+ return pb.CommandSubscribe_Exclusive
+}
+
+func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_InitialPosition {
+ switch p {
+ case SubscriptionPositionLatest:
+ return pb.CommandSubscribe_Latest
+ case SubscriptionPositionEarliest:
+ return pb.CommandSubscribe_Earliest
+ }
+
+ return pb.CommandSubscribe_Latest
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
new file mode 100644
index 0000000..ae4889b
--- /dev/null
+++ b/pulsar/consumer_partition.go
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "fmt"
+ "math"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+type consumerState int
+
+const (
+ consumerInit consumerState = iota
+ consumerReady
+ consumerClosing
+ consumerClosed
+)
+
+type partitionConsumerOpts struct {
+ topic string
+ consumerName string
+ subscription string
+ subscriptionType SubscriptionType
+ subscriptionInitPos SubscriptionInitialPosition
+ partitionIdx int
+ receiverQueueSize int
+}
+
+type partitionConsumer struct {
+ client *client
+
+ // this is needed for sending ConsumerMessage on the messageCh
+ parentConsumer Consumer
+ state consumerState
+ options *partitionConsumerOpts
+
+ conn internal.Connection
+
+ topic string
+ name string
+ consumerID uint64
+ partitionIdx int
+
+ // shared channel
+ messageCh chan ConsumerMessage
+
+ // the number of message slots available
+ availablePermits int32
+
+ // the size of the queue channel for buffering messages
+ queueSize int32
+ queueCh chan []*message
+
+ eventsCh chan interface{}
+ connectedCh chan struct{}
+ closeCh chan struct{}
+
+ log *log.Entry
+}
+
+func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
+ messageCh chan ConsumerMessage) (*partitionConsumer, error) {
+ pc := &partitionConsumer{
+ state: consumerInit,
+ parentConsumer: parent,
+ client: client,
+ options: options,
+ topic: options.topic,
+ name: options.consumerName,
+ consumerID: client.rpcClient.NewConsumerID(),
+ partitionIdx: options.partitionIdx,
+ eventsCh: make(chan interface{}, 3),
+ queueSize: int32(options.receiverQueueSize),
+ queueCh: make(chan []*message, options.receiverQueueSize),
+ connectedCh: make(chan struct{}),
+ messageCh: messageCh,
+ closeCh: make(chan struct{}),
+ log: log.WithField("topic", options.topic),
+ }
+ pc.log = pc.log.WithField("name", pc.name).WithField("subscription", options.subscription)
+
+ err := pc.grabConn()
+ if err != nil {
+ log.WithError(err).Errorf("Failed to create consumer")
+ return nil, err
+ }
+ pc.log.Info("Created consumer")
+ pc.state = consumerReady
+
+ go pc.dispatcher()
+
+ go pc.runEventsLoop()
+
+ return pc, nil
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+ req := &unsubscribeRequest{doneCh: make(chan struct{})}
+ pc.eventsCh <- req
+
+ // wait for the request to complete
+ <-req.doneCh
+ return req.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
+ defer close(unsub.doneCh)
+
+ requestID := pc.client.rpcClient.NewRequestID()
+ cmdUnsubscribe := &pb.CommandUnsubscribe{
+ RequestId: proto.Uint64(requestID),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ }
+ _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+ unsub.err = err
+ }
+
+ pc.conn.DeleteConsumeHandler(pc.consumerID)
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+ return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+ req := &ackRequest{
+ doneCh: make(chan struct{}),
+ msgID: msgID,
+ }
+ pc.eventsCh <- req
+
+ <-req.doneCh
+ return req.err
+}
+
+func (pc *partitionConsumer) Close() error {
+ if pc.state != consumerReady {
+ return nil
+ }
+
+ req := &closeRequest{doneCh: make(chan struct{})}
+ pc.eventsCh <- req
+
+ // wait for request to finish
+ <-req.doneCh
+ return req.err
+}
+
+func (pc *partitionConsumer) internalAck(req *ackRequest) {
+ defer close(req.doneCh)
+
+ id := &pb.MessageIdData{}
+ messageIDs := make([]*pb.MessageIdData, 0)
+ err := proto.Unmarshal(req.msgID.Serialize(), id)
+ if err != nil {
+ pc.log.WithError(err).Error("unable to serialize message id")
+ req.err = err
+ }
+
+ messageIDs = append(messageIDs, id)
+ requestID := internal.RequestIDNoResponse
+ cmdAck := &pb.CommandAck{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessageId: messageIDs,
+ AckType: pb.CommandAck_Individual.Enum(),
+ }
+ _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_ACK, cmdAck)
+ if err != nil {
+ pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
+ req.err = err
+ }
+}
+
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
+ pbMsgID := response.GetMessageId()
+
+ reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+ msgMeta, err := reader.ReadMessageMetadata()
+ if err != nil {
+ // TODO send discardCorruptedMessage
+ return err
+ }
+
+ numMsgs := 1
+ if msgMeta.NumMessagesInBatch != nil {
+ numMsgs = int(msgMeta.GetNumMessagesInBatch())
+ }
+ messages := make([]*message, numMsgs)
+ for i := 0; i < numMsgs; i++ {
+ smm, payload, err := reader.ReadMessage()
+ if err != nil {
+ // TODO send corrupted message
+ return err
+ }
+
+ msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+ var msg *message
+ if smm != nil {
+ msg = &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(smm.GetEventTime()),
+ key: smm.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(smm.GetProperties()),
+ topic: pc.topic,
+ msgID: msgID,
+ payLoad: payload,
+ }
+ } else {
+ msg = &message{
+ publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: msgID,
+ payLoad: payload,
+ }
+ }
+
+ messages[i] = msg
+ }
+
+ // send messages to the dispatcher
+ pc.queueCh <- messages
+ return nil
+}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+ // Trigger reconnection in the consumer goroutine
+ pc.eventsCh <- &connectionClosed{}
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accumulate these messages
+// before the application is ready to consume them. After the consumer is ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+ if permits == 0 {
+ return fmt.Errorf("invalid number of permits requested: %d", permits)
+ }
+
+ requestID := internal.RequestIDNoResponse
+ cmdFlow := &pb.CommandFlow{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ MessagePermits: proto.Uint32(permits),
+ }
+ _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, pb.BaseCommand_FLOW, cmdFlow)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// dispatcher manages the internal message queue channel
+// and manages the flow control
+func (pc *partitionConsumer) dispatcher() {
+ defer func() {
+ pc.log.Info("exiting dispatch loop")
+ }()
+ var messages []*message
+ for {
+ var queueCh chan []*message
+ var messageCh chan ConsumerMessage
+ var nextMessage ConsumerMessage
+
+ // are there more messages to send?
+ if len(messages) > 0 {
+ nextMessage = ConsumerMessage{
+ Consumer: pc.parentConsumer,
+ Message: messages[0],
+ }
+ messageCh = pc.messageCh
+ } else {
+ // we are ready for more messages
+ queueCh = pc.queueCh
+ }
+
+ select {
+ case <-pc.closeCh:
+ return
+
+ case _, ok := <-pc.connectedCh:
+ if !ok {
+ return
+ }
+ pc.log.Debug("dispatcher received connection event")
+
+ // drain messages
+ messages = nil
+
+ // drain the message queue on any new connection by sending a
+ // special nil message to the channel so we know when to stop dropping messages
+ go func() {
+ pc.queueCh <- nil
+ }()
+ for m := range pc.queueCh {
+ // the queue has been drained
+ if m == nil {
+ break
+ }
+ }
+
+ // reset available permits
+ pc.availablePermits = 0
+ initialPermits := uint32(pc.queueSize)
+
+ pc.log.Debugf("dispatcher requesting initial permits=%d", initialPermits)
+ // send initial permits
+ if err := pc.internalFlow(initialPermits); err != nil {
+ pc.log.WithError(err).Error("unable to send initial permits to broker")
+ }
+
+ case msgs, ok := <-queueCh:
+ if !ok {
+ return
+ }
+ // we only read messages here after the consumer has processed all messages
+ // in the previous batch
+ messages = msgs
+
+ // if the messageCh is nil or the messageCh is full this will not be selected
+ case messageCh <- nextMessage:
+ // allow this message to be garbage collected
+ messages[0] = nil
+ messages = messages[1:]
+
+ // TODO implement a better flow controller
+ // send more permits if needed
+ pc.availablePermits++
+ flowThreshold := int32(math.Max(float64(pc.queueSize/2), 1))
+ if pc.availablePermits >= flowThreshold {
+ availablePermits := pc.availablePermits
+ requestedPermits := availablePermits
+ pc.availablePermits = 0
+
+ pc.log.Debugf("requesting more permits=%d available=%d", requestedPermits, availablePermits)
+ if err := pc.internalFlow(uint32(requestedPermits)); err != nil {
+ pc.log.WithError(err).Error("unable to send permits")
+ }
+ }
+ }
+ }
+}
+
+type ackRequest struct {
+ doneCh chan struct{}
+ msgID MessageID
+ err error
+}
+
+type unsubscribeRequest struct {
+ doneCh chan struct{}
+ err error
+}
+
+type closeRequest struct {
+ doneCh chan struct{}
+ err error
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+ defer func() {
+ pc.log.Info("exiting events loop")
+ }()
+ for {
+ select {
+ case <-pc.closeCh:
+ return
+ case i := <-pc.eventsCh:
+ switch v := i.(type) {
+ case *ackRequest:
+ pc.internalAck(v)
+ case *unsubscribeRequest:
+ pc.internalUnsubscribe(v)
+ case *connectionClosed:
+ pc.reconnectToBroker()
+ case *closeRequest:
+ pc.internalClose(v)
+ return
+ }
+ }
+ }
+}
+
+func (pc *partitionConsumer) internalClose(req *closeRequest) {
+ defer close(req.doneCh)
+ if pc.state != consumerReady {
+ return
+ }
+
+ pc.state = consumerClosing
+ pc.log.Infof("Closing consumer=%d", pc.consumerID)
+
+ requestID := pc.client.rpcClient.NewConsumerID()
+ cmdClose := &pb.CommandCloseConsumer{
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ }
+ _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+ if err != nil {
+ req.err = err
+ } else {
+ pc.log.Info("Closed consumer")
+ pc.state = consumerClosed
+ pc.conn.DeleteConsumeHandler(pc.consumerID)
+ close(pc.closeCh)
+ }
+}
+
+func (pc *partitionConsumer) reconnectToBroker() {
+ backoff := internal.Backoff{}
+ for {
+ if pc.state != consumerReady {
+ // Consumer is already closing
+ return
+ }
+
+ d := backoff.Next()
+ pc.log.Info("Reconnecting to broker in ", d)
+ time.Sleep(d)
+
+ err := pc.grabConn()
+ if err == nil {
+ // Successfully reconnected
+ pc.log.Info("Reconnected consumer to broker")
+ return
+ }
+ }
+}
+
+func (pc *partitionConsumer) grabConn() error {
+ lr, err := pc.client.lookupService.Lookup(pc.topic)
+ if err != nil {
+ pc.log.WithError(err).Warn("Failed to lookup topic")
+ return err
+ }
+ pc.log.Debugf("Lookup result: %+v", lr)
+
+ subType := toProtoSubType(pc.options.subscriptionType)
+ initialPosition := toProtoInitialPosition(pc.options.subscriptionInitPos)
+ requestID := pc.client.rpcClient.NewRequestID()
+ cmdSubscribe := &pb.CommandSubscribe{
+ RequestId: proto.Uint64(requestID),
+ Topic: proto.String(pc.topic),
+ SubType: subType.Enum(),
+ Subscription: proto.String(pc.options.subscription),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ ConsumerName: proto.String(pc.name),
+ InitialPosition: initialPosition.Enum(),
+ Schema: nil,
+ }
+
+ res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
+ pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+
+ if err != nil {
+ pc.log.WithError(err).Error("Failed to create consumer")
+ return err
+ }
+
+ if res.Response.ConsumerStatsResponse != nil {
+ pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
+ }
+
+ pc.conn = res.Cnx
+ pc.log.Info("Connected consumer")
+ pc.conn.AddConsumeHandler(pc.consumerID, pc)
+
+ msgType := res.Response.GetType()
+
+ switch msgType {
+ case pb.BaseCommand_SUCCESS:
+ // notify the dispatcher we have connection
+ go func() {
+ pc.connectedCh <- struct{}{}
+ }()
+ return nil
+ case pb.BaseCommand_ERROR:
+ errMsg := res.Response.GetError()
+ return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
+ default:
+ return newUnexpectedErrMsg(msgType, requestID)
+ }
+}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e6f4d52..bbc1315 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -22,11 +22,9 @@ import (
"fmt"
"log"
"net/http"
- "strings"
"testing"
"time"
- "github.com/apache/pulsar-client-go/util"
"github.com/stretchr/testify/assert"
)
@@ -96,11 +94,6 @@ func TestProducerConsumer(t *testing.T) {
log.Fatal(err)
}
}
-
- // unsubscribe consumer
- if err := consumer.Unsubscribe(); err != nil {
- log.Fatal(err)
- }
}
func TestConsumerConnectError(t *testing.T) {
@@ -176,13 +169,6 @@ func TestBatchMessageReceive(t *testing.T) {
count++
}
- // check strategically
- for i := 0; i < 3; i++ {
- if count == numOfMessages {
- break
- }
- time.Sleep(time.Second)
- }
assert.Equal(t, count, numOfMessages)
}
@@ -202,7 +188,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
Topic: "my-topic",
})
- // Expect error in creating cosnumer
+ // Expect error in creating consumer
assert.Nil(t, consumer)
assert.NotNil(t, err)
@@ -220,7 +206,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
assert.Equal(t, err.(*Error).Result(), TopicNotFound)
}
-func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
+func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -238,9 +224,8 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()
- //sent message
+ // send message
ctx := context.Background()
-
err = producer.Send(ctx, &ProducerMessage{
Payload: []byte("msg-1-content-1"),
})
@@ -253,9 +238,9 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topicName,
- SubscriptionName: subName,
- SubscriptionInitPos: Earliest,
+ Topic: topicName,
+ SubscriptionName: subName,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer consumer.Close()
@@ -266,24 +251,6 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}
-func makeHTTPCall(t *testing.T, method string, urls string, body string) {
- client := http.Client{}
-
- req, err := http.NewRequest(method, urls, strings.NewReader(body))
- if err != nil {
- t.Fatal(err)
- }
-
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("Accept", "application/json")
-
- res, err := client.Do(req)
- if err != nil {
- t.Fatal(err)
- }
- defer res.Body.Close()
-}
-
func TestConsumerKeyShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
@@ -311,7 +278,8 @@ func TestConsumerKeyShared(t *testing.T) {
// create producer
producer, err := client.CreateProducer(ProducerOptions{
- Topic: topic,
+ Topic: topic,
+ DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()
@@ -325,33 +293,32 @@ func TestConsumerKeyShared(t *testing.T) {
assert.Nil(t, err)
}
- time.Sleep(time.Second * 1)
-
- go func() {
- for i := 0; i < 10; i++ {
- msg, err := consumer1.Receive(ctx)
- assert.Nil(t, err)
- if msg != nil {
- fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
- err = consumer1.Ack(msg)
- assert.Nil(t, err)
+ receivedConsumer1 := 0
+ receivedConsumer2 := 0
+ for (receivedConsumer1 + receivedConsumer2) < 10 {
+ select {
+ case cm, ok := <-consumer1.Chan():
+ if !ok {
+ break
}
- }
- }()
-
- go func() {
- for i := 0; i < 10; i++ {
- msg2, err := consumer2.Receive(ctx)
- assert.Nil(t, err)
- if msg2 != nil {
- fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
- err = consumer2.Ack(msg2)
- assert.Nil(t, err)
+ receivedConsumer1++
+ if err := consumer1.Ack(cm.Message); err != nil {
+ log.Fatal(err)
+ }
+ case cm, ok := <-consumer2.Chan():
+ if !ok {
+ break
+ }
+ receivedConsumer2++
+ if err := consumer2.Ack(cm.Message); err != nil {
+ log.Fatal(err)
}
}
- }()
+ }
- time.Sleep(time.Second * 1)
+ fmt.Printf("TestConsumerKeyShared received messages consumer1: %d consumser2: %d\n",
+ receivedConsumer1, receivedConsumer2)
+ assert.Equal(t, 10, receivedConsumer1+receivedConsumer2)
}
func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -414,77 +381,16 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Equal(t, len(msgs), 10)
}
-func TestConsumer_ReceiveAsync(t *testing.T) {
+func TestConsumerReceiveTimeout(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
-
assert.Nil(t, err)
defer client.Close()
- topicName := "persistent://public/default/receive-async"
- subName := "subscription-receive-async"
- ctx := context.Background()
- ch := make(chan ConsumerMessage, 10)
-
- // create producer
- producer, err := client.CreateProducer(ProducerOptions{
- Topic: topicName,
- })
- assert.Nil(t, err)
- defer producer.Close()
-
- consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topicName,
- SubscriptionName: subName,
- })
- assert.Nil(t, err)
- defer consumer.Close()
-
- //send 10 messages
- for i := 0; i < 10; i++ {
- err = producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("hello-%d", i)),
- })
- assert.Nil(t, err)
- }
-
- //receive async 10 messages
- err = consumer.ReceiveAsync(ctx, ch)
- assert.Nil(t, err)
-
- payloadList := make([]string, 0, 10)
-
-RECEIVE:
- for {
- select {
- case cMsg, ok := <-ch:
- if ok {
- fmt.Printf("receive message payload is:%s\n", string(cMsg.Payload()))
- assert.Equal(t, topicName, cMsg.Message.Topic())
- assert.Equal(t, topicName, cMsg.Consumer.Topic())
- payloadList = append(payloadList, string(cMsg.Message.Payload()))
- if len(payloadList) == 10 {
- break RECEIVE
- }
- }
- continue RECEIVE
- case <-ctx.Done():
- t.Error("context error.")
- return
- }
- }
-}
-
-func TestConsumerAckTimeout(t *testing.T) {
- client, err := NewClient(ClientOptions{
- URL: lookupURL,
- })
- assert.Nil(t, err)
- defer client.Close()
-
- topic := "test-ack-timeout-topic-1"
- ctx := context.Background()
+ topic := "test-topic-with-no-messages"
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
@@ -496,155 +402,12 @@ func TestConsumerAckTimeout(t *testing.T) {
assert.Nil(t, err)
defer consumer.Close()
- // create consumer1
- consumer1, err := client.Subscribe(ConsumerOptions{
- Topic: topic,
- SubscriptionName: "my-sub2",
- Type: Shared,
- AckTimeout: 5 * 1000,
- })
- assert.Nil(t, err)
- defer consumer1.Close()
-
- // create producer
- producer, err := client.CreateProducer(ProducerOptions{
- Topic: topic,
- DisableBatching: true,
- })
- assert.Nil(t, err)
- defer producer.Close()
-
- // send 10 messages
- for i := 0; i < 10; i++ {
- if err := producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("hello-%d", i)),
- }); err != nil {
- log.Fatal(err)
- }
- }
-
- // consumer receive 10 messages
- payloadList := make([]string, 0, 10)
- for i := 0; i < 10; i++ {
- msg, err := consumer.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- payloadList = append(payloadList, string(msg.Payload()))
-
- // not ack message
- }
- assert.Equal(t, 10, len(payloadList))
-
- // consumer1 receive 10 messages
- for i := 0; i < 10; i++ {
- msg, err := consumer1.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
-
- payloadList = append(payloadList, string(msg.Payload()))
-
- // ack half of the messages
- if i%2 == 0 {
- err = consumer1.Ack(msg)
- assert.Nil(t, err)
- }
- }
-
- // wait ack timeout
- time.Sleep(6 * time.Second)
-
- fmt.Println("start redeliver messages...")
-
- payloadList = make([]string, 0, 10)
- // consumer receive messages again
- for i := 0; i < 10; i++ {
- msg, err := consumer.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- payloadList = append(payloadList, string(msg.Payload()))
-
- // ack message
- if err := consumer.Ack(msg); err != nil {
- log.Fatal(err)
- }
- }
- assert.Equal(t, 10, len(payloadList))
-
- payloadList = make([]string, 0, 5)
- // consumer1 receive messages again
- go func() {
- for i := 0; i < 10; i++ {
- msg, err := consumer1.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
-
- expectMsg := fmt.Sprintf("hello-%d", i)
- fmt.Printf("redeliver messages, payload is:%s\n", expectMsg)
- payloadList = append(payloadList, string(msg.Payload()))
-
- // ack message
- if err := consumer1.Ack(msg); err != nil {
- log.Fatal(err)
- }
- }
- assert.Equal(t, 5, len(payloadList))
- }()
-
- // sleep 2 seconds, wait gorutine receive messages.
- time.Sleep(time.Second * 2)
-}
-
-func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
- client, err := NewClient(ClientOptions{
- URL: lookupURL,
- })
-
- assert.Nil(t, err)
- defer client.Close()
-
- topicName := "persistent://public/default/receive-async-with-callback"
- subName := "subscription-receive-async"
- ctx := context.Background()
-
- // create producer
- producer, err := client.CreateProducer(ProducerOptions{
- Topic: topicName,
- })
- assert.Nil(t, err)
- defer producer.Close()
-
- consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topicName,
- SubscriptionName: subName,
- })
- assert.Nil(t, err)
- defer consumer.Close()
-
- //send 10 messages
- for i := 0; i < 10; i++ {
- err := producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("hello-%d", i)),
- })
- assert.Nil(t, err)
- }
-
- for i := 0; i < 10; i++ {
- tmpMsg := fmt.Sprintf("hello-%d", i)
- consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err error) {
- if err != nil {
- log.Fatal(err)
- }
- fmt.Printf("receive message payload is:%s\n", string(msg.Payload()))
- assert.Equal(t, tmpMsg, string(msg.Payload()))
- })
- }
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, msg)
+ assert.NotNil(t, err)
}
-func TestConsumer_Shared(t *testing.T) {
+func TestConsumerShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -682,116 +445,49 @@ func TestConsumer_Shared(t *testing.T) {
assert.Nil(t, err)
defer producer.Close()
- // send 10 messages
+ // send 10 messages with unique payloads
for i := 0; i < 10; i++ {
if err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
+ fmt.Println("sending message:", fmt.Sprintf("hello-%d", i))
}
- msgList := make([]string, 0, 5)
- for i := 0; i < 5; i++ {
- msg, err := consumer1.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
- msgList = append(msgList, string(msg.Payload()))
- if err := consumer1.Ack(msg); err != nil {
- log.Fatal(err)
- }
- }
-
- assert.Equal(t, 5, len(msgList))
-
- for i := 0; i < 5; i++ {
- msg, err := consumer2.Receive(context.Background())
- if err != nil {
- log.Fatal(err)
- }
- if err := consumer2.Ack(msg); err != nil {
- log.Fatal(err)
- }
- fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), string(msg.Payload()))
- msgList = append(msgList, string(msg.Payload()))
- }
-
- assert.Equal(t, 10, len(msgList))
- res := util.RemoveDuplicateElement(msgList)
- assert.Equal(t, 10, len(res))
-}
-
-func TestConsumer_Seek(t *testing.T) {
- client, err := NewClient(ClientOptions{
- URL: lookupURL,
- })
-
- assert.Nil(t, err)
- defer client.Close()
-
- topicName := "persistent://public/default/testSeek"
- testURL := adminURL + "/" + "admin/v2/persistent/public/default/testSeek"
- makeHTTPCall(t, http.MethodPut, testURL, "1")
- subName := "sub-testSeek"
-
- producer, err := client.CreateProducer(ProducerOptions{
- Topic: topicName,
- })
- assert.Nil(t, err)
- assert.Equal(t, producer.Topic(), topicName)
- defer producer.Close()
-
- consumer, err := client.Subscribe(ConsumerOptions{
- Topic: topicName,
- SubscriptionName: subName,
- })
- assert.Nil(t, err)
- assert.Equal(t, consumer.Topic(), topicName)
- assert.Equal(t, consumer.Subscription(), subName)
- defer consumer.Close()
-
- ctx := context.Background()
-
- // Send 10 messages synchronously
- t.Log("Publishing 10 messages synchronously")
- for msgNum := 0; msgNum < 10; msgNum++ {
- if err := producer.Send(ctx, &ProducerMessage{
- Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
- }); err != nil {
- t.Fatal(err)
- }
- }
-
- t.Log("Trying to receive 10 messages")
- idList := make([]MessageID, 0, 10)
- for msgNum := 0; msgNum < 10; msgNum++ {
- msg, err := consumer.Receive(ctx)
- assert.Nil(t, err)
- idList = append(idList, msg.ID())
- fmt.Println(string(msg.Payload()))
- }
-
- for index, id := range idList {
- if index == 4 {
- // seek to fourth message, expected receive fourth message.
- err = consumer.Seek(id)
- assert.Nil(t, err)
- break
+ readMsgs := 0
+ messages := make(map[string]struct{})
+ for readMsgs < 10 {
+ select {
+ case cm, ok := <-consumer1.Chan():
+ if !ok {
+ break
+ }
+ readMsgs++
+ payload := string(cm.Message.Payload())
+ messages[payload] = struct{}{}
+ fmt.Printf("consumer1 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
+ if err := consumer1.Ack(cm.Message); err != nil {
+ log.Fatal(err)
+ }
+ case cm, ok := <-consumer2.Chan():
+ if !ok {
+ break
+ }
+ readMsgs++
+ payload := string(cm.Message.Payload())
+ messages[payload] = struct{}{}
+ fmt.Printf("consumer2 msg id is: %v, value is: %s\n", cm.Message.ID(), payload)
+ if err := consumer2.Ack(cm.Message); err != nil {
+ log.Fatal(err)
+ }
}
}
- // Sleeping for 500ms to wait for consumer re-connect
- time.Sleep(500 * time.Millisecond)
-
- msg, err := consumer.Receive(ctx)
- assert.Nil(t, err)
- t.Logf("again received message:%+v", msg.ID())
- assert.Equal(t, "msg-content-4", string(msg.Payload()))
+ assert.Equal(t, 10, len(messages))
}
-func TestConsumer_EventTime(t *testing.T) {
+func TestConsumerEventTime(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
@@ -828,7 +524,7 @@ func TestConsumer_EventTime(t *testing.T) {
assert.Equal(t, "test", string(msg.Payload()))
}
-func TestConsumer_Flow(t *testing.T) {
+func TestConsumerFlow(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
diff --git a/util/error.go b/pulsar/helper.go
similarity index 84%
rename from util/error.go
rename to pulsar/helper.go
index 755b7c0..83975f4 100644
--- a/util/error.go
+++ b/pulsar/helper.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package util
+package pulsar
import (
"fmt"
@@ -26,24 +26,24 @@ import (
// NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
// Optionally provide a list of IDs associated with the message
// for additional context in the error message.
-func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *UnexpectedErrMsg {
- return &UnexpectedErrMsg{
+func newUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) *unexpectedErrMsg {
+ return &unexpectedErrMsg{
msgType: msgType,
ids: ids,
}
}
// UnexpectedErrMsg is returned when an unexpected message is received.
-type UnexpectedErrMsg struct {
+type unexpectedErrMsg struct {
msgType pb.BaseCommand_Type
ids []interface{}
}
// Error satisfies the error interface.
-func (e *UnexpectedErrMsg) Error() string {
+func (e *unexpectedErrMsg) Error() string {
msg := fmt.Sprintf("received unexpected message of type %q", e.msgType.String())
for _, id := range e.ids {
- msg += fmt.Sprintf(" id=%v", id)
+ msg += fmt.Sprintf(" consumerID=%v", id)
}
return msg
}
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 7077155..53c2074 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -21,12 +21,13 @@ import (
"fmt"
"net/url"
- "github.com/apache/pulsar-client-go/pkg/auth"
- "github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
+
+ "github.com/apache/pulsar-client-go/pkg/auth"
+ "github.com/apache/pulsar-client-go/pkg/pb"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
)
type client struct {
@@ -97,7 +98,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
}
func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
- consumer, err := newConsumer(client, &options)
+ consumer, err := newConsumer(client, options)
if err != nil {
return nil, err
}
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
deleted file mode 100644
index 28535b5..0000000
--- a/pulsar/impl_consumer.go
+++ /dev/null
@@ -1,316 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
- "context"
- "errors"
- "fmt"
-
- "github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/apache/pulsar-client-go/util"
- "github.com/golang/protobuf/proto"
-
- log "github.com/sirupsen/logrus"
-)
-
-type consumer struct {
- topicName string
- consumers []Consumer
- log *log.Entry
- queue chan ConsumerMessage
- unackMsgTracker *UnackedMessageTracker
-}
-
-func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
- if options == nil {
- return nil, newError(ResultInvalidConfiguration, "consumer configuration undefined")
- }
-
- if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
- return nil, newError(TopicNotFound, "topic is required")
- }
-
- if options.SubscriptionName == "" {
- return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
- }
-
- if options.ReceiverQueueSize == 0 {
- options.ReceiverQueueSize = 1000
- }
-
- if options.TopicsPattern != "" {
- if options.Topics != nil {
- return nil, newError(ResultInvalidConfiguration, "Topic names list must be null when use topicsPattern")
- }
- // TODO: impl logic
- } else if options.Topics != nil && len(options.Topics) > 1 {
- // TODO: impl logic
- } else if options.Topics != nil && len(options.Topics) == 1 || options.Topic != "" {
- var singleTopicName string
- if options.Topic != "" {
- singleTopicName = options.Topic
- } else {
- singleTopicName = options.Topics[0]
- }
- return singleTopicSubscribe(client, options, singleTopicName)
- }
-
- return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
-}
-
-func singleTopicSubscribe(client *client, options *ConsumerOptions, topic string) (*consumer, error) {
- c := &consumer{
- topicName: topic,
- log: log.WithField("topic", topic),
- queue: make(chan ConsumerMessage, options.ReceiverQueueSize),
- }
-
- partitions, err := client.TopicPartitions(topic)
- if err != nil {
- return nil, err
- }
-
- numPartitions := len(partitions)
- c.consumers = make([]Consumer, numPartitions)
-
- type ConsumerError struct {
- err error
- partition int
- cons Consumer
- }
-
- ch := make(chan ConsumerError, numPartitions)
-
- for partitionIdx, partitionTopic := range partitions {
- // this needs to be created outside in the same go routine since
- // newPartitionConsumer can modify the shared options struct causing a race condition
- cons, err := newPartitionConsumer(client, partitionTopic, options, partitionIdx, numPartitions, c.queue)
- go func(partitionIdx int, partitionTopic string) {
- ch <- ConsumerError{
- err: err,
- partition: partitionIdx,
- cons: cons,
- }
- }(partitionIdx, partitionTopic)
- }
-
- for i := 0; i < numPartitions; i++ {
- ce, ok := <-ch
- if ok {
- err = ce.err
- c.consumers[ce.partition] = ce.cons
- }
- }
-
- if err != nil {
- // Since there were some failures, cleanup all the partitions that succeeded in creating the consumers
- for _, consumer := range c.consumers {
- if !util.IsNil(consumer) {
- if err = consumer.Close(); err != nil {
- panic("close consumer error, please check.")
- }
- }
- }
- return nil, err
- }
-
- return c, nil
-}
-
-func (c *consumer) Topic() string {
- return c.topicName
-}
-
-func (c *consumer) Subscription() string {
- return c.consumers[0].Subscription()
-}
-
-func (c *consumer) Unsubscribe() error {
- var errMsg string
- for _, consumer := range c.consumers {
- if err := consumer.Unsubscribe(); err != nil {
- errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
- }
- }
- if errMsg != "" {
- return errors.New(errMsg)
- }
- return nil
-}
-
-func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
- for _, pc := range c.consumers {
- go func(pc Consumer) {
- err := pc.ReceiveAsync(ctx, c.queue)
- if err != nil {
- return
- }
- }(pc)
- }
-}
-
-func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
- if len(c.consumers) > 1 {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case cMsg, ok := <-c.queue:
- if ok {
- return cMsg.Message, nil
- }
- return nil, errors.New("receive message error")
- }
- }
-
- return c.consumers[0].(*partitionConsumer).Receive(ctx)
-}
-
-func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
- for _, pc := range c.consumers {
- go func(pc Consumer) {
- if err := pc.ReceiveAsync(ctx, msgs); err != nil {
- c.log.Errorf("receive async messages error:%s, please check.", err.Error())
- return
- }
- }(pc)
- }
-
- return nil
-}
-
-func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
- var err error
- if len(c.consumers) > 1 {
- select {
- case <-ctx.Done():
- c.log.Errorf("ReceiveAsyncWithCallback: receive message error:%s", ctx.Err().Error())
- return
- case cMsg, ok := <-c.queue:
- if ok {
- callback(cMsg.Message, err)
- }
- return
- }
- }
- c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, callback)
-}
-
-//Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
- return c.AckID(msg.ID())
-}
-
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(msgID.Serialize(), id)
- if err != nil {
- c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- return err
- }
-
- partition := id.GetPartition()
- if partition < 0 {
- return c.consumers[0].AckID(msgID)
- }
- return c.consumers[partition].AckID(msgID)
-}
-
-func (c *consumer) AckCumulative(msg Message) error {
- return c.AckCumulativeID(msg.ID())
-}
-
-func (c *consumer) AckCumulativeID(msgID MessageID) error {
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(msgID.Serialize(), id)
- if err != nil {
- c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- return err
- }
-
- partition := id.GetPartition()
- if partition < 0 {
- return errors.New("invalid partition index")
- }
- return c.consumers[partition].AckCumulativeID(msgID)
-}
-
-func (c *consumer) Close() error {
- for _, pc := range c.consumers {
- return pc.Close()
- }
- return nil
-}
-
-func (c *consumer) Seek(msgID MessageID) error {
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(msgID.Serialize(), id)
- if err != nil {
- c.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- return err
- }
-
- partition := id.GetPartition()
-
- // current topic is non-partition topic, we only need to get the first value in the consumers.
- if partition < 0 {
- partition = 0
- }
- return c.consumers[partition].Seek(msgID)
-}
-
-func (c *consumer) RedeliverUnackedMessages() error {
- var errMsg string
- for _, c := range c.consumers {
- if err := c.RedeliverUnackedMessages(); err != nil {
- errMsg += fmt.Sprintf("topic %s, subscription %s: %s", c.Topic(), c.Subscription(), err)
- }
- }
-
- if errMsg != "" {
- return errors.New(errMsg)
- }
- return nil
-}
-
-func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
- switch st {
- case Exclusive:
- return pb.CommandSubscribe_Exclusive
- case Shared:
- return pb.CommandSubscribe_Shared
- case Failover:
- return pb.CommandSubscribe_Failover
- case KeyShared:
- return pb.CommandSubscribe_Key_Shared
- }
-
- return pb.CommandSubscribe_Exclusive
-}
-
-func toProtoInitialPosition(p InitialPosition) pb.CommandSubscribe_InitialPosition {
- switch p {
- case Latest:
- return pb.CommandSubscribe_Latest
- case Earliest:
- return pb.CommandSubscribe_Earliest
- }
-
- return pb.CommandSubscribe_Latest
-}
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
deleted file mode 100644
index 83e09ad..0000000
--- a/pulsar/impl_partition_consumer.go
+++ /dev/null
@@ -1,739 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
- "context"
- "fmt"
- "math"
- "sync"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- log "github.com/sirupsen/logrus"
-
- "github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/apache/pulsar-client-go/pulsar/internal"
- "github.com/apache/pulsar-client-go/util"
-)
-
-const maxRedeliverUnacknowledged = 1000
-
-type consumerState int
-
-const (
- consumerInit consumerState = iota
- consumerReady
- consumerClosing
- consumerClosed
-)
-
-type partitionConsumer struct {
- state consumerState
- client *client
- topic string
- log *log.Entry
- cnx internal.Connection
-
- options *ConsumerOptions
- consumerName *string
- consumerID uint64
- subQueue chan ConsumerMessage
-
- omu sync.Mutex // protects following
- redeliverMessages []*pb.MessageIdData
-
- unAckTracker *UnackedMessageTracker
- receivedSinceFlow uint32
-
- eventsChan chan interface{}
- partitionIdx int
- partitionNum int
-}
-
-func newPartitionConsumer(client *client, topic string, options *ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) (*partitionConsumer, error) {
- c := &partitionConsumer{
- state: consumerInit,
- client: client,
- topic: topic,
- options: options,
- log: log.WithField("topic", topic),
- consumerID: client.rpcClient.NewConsumerID(),
- partitionIdx: partitionID,
- partitionNum: partitionNum,
- eventsChan: make(chan interface{}, 1),
- subQueue: make(chan ConsumerMessage, options.ReceiverQueueSize),
- receivedSinceFlow: 0,
- }
-
- c.setDefault(options)
-
- if options.MessageChannel == nil {
- options.MessageChannel = make(chan ConsumerMessage, options.ReceiverQueueSize)
- } else {
- c.subQueue = options.MessageChannel
- }
-
- if options.Name != "" {
- c.consumerName = &options.Name
- }
-
- if options.Type == Shared || options.Type == KeyShared {
- if options.AckTimeout != 0 {
- c.unAckTracker = NewUnackedMessageTracker()
- c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
- c.unAckTracker.Start(int64(options.AckTimeout))
- }
- }
-
- err := c.grabCnx()
- if err != nil {
- log.WithError(err).Errorf("Failed to create consumer")
- return nil, err
- }
- c.log = c.log.WithField("name", c.consumerName)
- c.log.Info("Created consumer")
- c.state = consumerReady
-
- // In here, open a gorutine to receive data asynchronously from the subConsumer,
- // filling the queue channel of the current consumer.
- if partitionNum > 1 {
- go func() {
- err = c.ReceiveAsync(context.Background(), ch)
- if err != nil {
- return
- }
- }()
- }
-
- go c.runEventsLoop()
-
- return c, nil
-}
-
-func (pc *partitionConsumer) setDefault(options *ConsumerOptions) {
- if options.ReceiverQueueSize <= 0 {
- options.ReceiverQueueSize = 1000
- }
-
- if options.AckTimeout == 0 {
- options.AckTimeout = time.Second * 30
- }
-}
-
-func (pc *partitionConsumer) grabCnx() error {
- lr, err := pc.client.lookupService.Lookup(pc.topic)
- if err != nil {
- pc.log.WithError(err).Warn("Failed to lookup topic")
- return err
- }
- pc.log.Debugf("Lookup result: %v", lr)
-
- subType := toProtoSubType(pc.options.Type)
- initialPosition := toProtoInitialPosition(pc.options.SubscriptionInitPos)
- requestID := pc.client.rpcClient.NewRequestID()
- res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
- pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
- RequestId: proto.Uint64(requestID),
- Topic: proto.String(pc.topic),
- SubType: subType.Enum(),
- Subscription: proto.String(pc.options.SubscriptionName),
- ConsumerId: proto.Uint64(pc.consumerID),
- ConsumerName: proto.String(pc.options.Name),
- InitialPosition: initialPosition.Enum(),
- Schema: nil,
- })
-
- if err != nil {
- pc.log.WithError(err).Error("Failed to create consumer")
- return err
- }
-
- if res.Response.ConsumerStatsResponse != nil {
- pc.consumerName = res.Response.ConsumerStatsResponse.ConsumerName
- }
-
- pc.cnx = res.Cnx
- pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
- pc.cnx.AddConsumeHandler(pc.consumerID, pc)
-
- msgType := res.Response.GetType()
-
- switch msgType {
- case pb.BaseCommand_SUCCESS:
- return pc.internalFlow(uint32(pc.options.ReceiverQueueSize))
- case pb.BaseCommand_ERROR:
- errMsg := res.Response.GetError()
- return fmt.Errorf("%s: %s", errMsg.GetError().String(), errMsg.GetMessage())
- default:
- return util.NewUnexpectedErrMsg(msgType, requestID)
- }
-}
-
-func (pc *partitionConsumer) Topic() string {
- return pc.topic
-}
-
-func (pc *partitionConsumer) Subscription() string {
- return pc.options.SubscriptionName
-}
-
-func (pc *partitionConsumer) Unsubscribe() error {
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- hu := &handleUnsubscribe{
- waitGroup: wg,
- err: nil,
- }
- pc.eventsChan <- hu
-
- wg.Wait()
- return hu.err
-}
-
-func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
- requestID := pc.client.rpcClient.NewRequestID()
- _, err := pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
- pb.BaseCommand_UNSUBSCRIBE, &pb.CommandUnsubscribe{
- RequestId: proto.Uint64(requestID),
- ConsumerId: proto.Uint64(pc.consumerID),
- })
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- unsub.err = err
- }
-
- pc.cnx.DeleteConsumeHandler(pc.consumerID)
- if pc.unAckTracker != nil {
- pc.unAckTracker.Stop()
- }
-
- unsub.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(msgID.Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- return err
- }
- if pc.unAckTracker != nil {
- pc.unAckTracker.Add(id)
- }
- return nil
-}
-
-func (pc *partitionConsumer) increaseAvailablePermits() error {
- pc.receivedSinceFlow++
- highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 1))
-
- pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", pc.receivedSinceFlow, highWater)
-
- // send flow request after 1/2 of the queue has been consumed
- if pc.receivedSinceFlow >= highWater {
- pc.log.Debugf("send flow command to broker, permits size is: %d", pc.receivedSinceFlow)
- err := pc.internalFlow(pc.receivedSinceFlow)
- if err != nil {
- pc.log.Errorf("Send flow cmd error:%s", err.Error())
- pc.receivedSinceFlow = 0
- return err
- }
- pc.receivedSinceFlow = 0
- }
- return nil
-}
-
-func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
- err := pc.trackMessage(msgID)
- if err != nil {
- return err
- }
-
- err = pc.increaseAvailablePermits()
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, err error) {
- wg := &sync.WaitGroup{}
- wg.Add(1)
- pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
- message = msg
- err = e
- wg.Done()
- })
- wg.Wait()
-
- return message, err
-}
-
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error {
- for {
- select {
- case tmpMsg, ok := <-pc.subQueue:
- if ok {
- msgs <- tmpMsg
-
- err := pc.messageProcessed(tmpMsg.ID())
- if err != nil {
- return err
- }
- continue
- }
- break
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-}
-
-func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, callback func(msg Message, err error)) {
- var err error
-
- select {
- case tmpMsg, ok := <-pc.subQueue:
- if ok {
- err = pc.messageProcessed(tmpMsg.ID())
- callback(tmpMsg.Message, err)
- if err != nil {
- pc.log.Errorf("processed messages error:%s", err.Error())
- return
- }
- }
- case <-ctx.Done():
- pc.log.Errorf("context shouldn't done, please check error:%s", ctx.Err().Error())
- return
- }
-}
-
-func (pc *partitionConsumer) Ack(msg Message) error {
- return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
- wg := &sync.WaitGroup{}
- wg.Add(1)
- ha := &handleAck{
- msgID: msgID,
- waitGroup: wg,
- err: nil,
- }
- pc.eventsChan <- ha
- wg.Wait()
- return ha.err
-}
-
-func (pc *partitionConsumer) internalAck(ack *handleAck) {
- id := &pb.MessageIdData{}
- messageIDs := make([]*pb.MessageIdData, 0)
- err := proto.Unmarshal(ack.msgID.Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- ack.err = err
- }
-
- messageIDs = append(messageIDs, id)
-
- requestID := pc.client.rpcClient.NewRequestID()
- _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
- pb.BaseCommand_ACK, &pb.CommandAck{
- ConsumerId: proto.Uint64(pc.consumerID),
- MessageId: messageIDs,
- AckType: pb.CommandAck_Individual.Enum(),
- })
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- ack.err = err
- }
-
- if pc.unAckTracker != nil {
- pc.unAckTracker.Remove(id)
- }
- ack.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) AckCumulative(msg Message) error {
- return pc.AckCumulativeID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckCumulativeID(msgID MessageID) error {
- hac := &handleAckCumulative{
- msgID: msgID,
- err: nil,
- }
- pc.eventsChan <- hac
-
- return hac.err
-}
-
-func (pc *partitionConsumer) internalAckCumulative(ackCumulative *handleAckCumulative) {
- id := &pb.MessageIdData{}
- messageIDs := make([]*pb.MessageIdData, 0)
- err := proto.Unmarshal(ackCumulative.msgID.Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- ackCumulative.err = err
- }
- messageIDs = append(messageIDs, id)
-
- requestID := pc.client.rpcClient.NewRequestID()
- _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
- pb.BaseCommand_ACK, &pb.CommandAck{
- ConsumerId: proto.Uint64(pc.consumerID),
- MessageId: messageIDs,
- AckType: pb.CommandAck_Cumulative.Enum(),
- })
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- ackCumulative.err = err
- }
-
- if pc.unAckTracker != nil {
- pc.unAckTracker.Remove(id)
- }
-}
-
-func (pc *partitionConsumer) Close() error {
- if pc.state != consumerReady {
- return nil
- }
- if pc.unAckTracker != nil {
- pc.unAckTracker.Stop()
- }
-
- wg := sync.WaitGroup{}
- wg.Add(1)
-
- cc := &handlerClose{&wg, nil}
- pc.eventsChan <- cc
-
- wg.Wait()
- return cc.err
-}
-
-func (pc *partitionConsumer) Seek(msgID MessageID) error {
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- hc := &handleSeek{
- msgID: msgID,
- waitGroup: wg,
- err: nil,
- }
- pc.eventsChan <- hc
-
- wg.Wait()
- return hc.err
-}
-
-func (pc *partitionConsumer) internalSeek(seek *handleSeek) {
- if pc.state == consumerClosing || pc.state == consumerClosed {
- pc.log.Error("Consumer was already closed")
- return
- }
-
- id := &pb.MessageIdData{}
- err := proto.Unmarshal(seek.msgID.Serialize(), id)
- if err != nil {
- pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
- seek.err = err
- }
-
- requestID := pc.client.rpcClient.NewRequestID()
- _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
- pb.BaseCommand_SEEK, &pb.CommandSeek{
- ConsumerId: proto.Uint64(pc.consumerID),
- RequestId: proto.Uint64(requestID),
- MessageId: id,
- })
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- seek.err = err
- }
-
- seek.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) RedeliverUnackedMessages() error {
- wg := &sync.WaitGroup{}
- wg.Add(1)
-
- hr := &handleRedeliver{
- waitGroup: wg,
- err: nil,
- }
- pc.eventsChan <- hr
- wg.Wait()
- return hr.err
-}
-
-func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
- pc.omu.Lock()
- defer pc.omu.Unlock()
-
- redeliverMessagesSize := len(pc.redeliverMessages)
-
- if redeliverMessagesSize == 0 {
- return
- }
-
- requestID := pc.client.rpcClient.NewRequestID()
-
- for i := 0; i < len(pc.redeliverMessages); i += maxRedeliverUnacknowledged {
- end := i + maxRedeliverUnacknowledged
- if end > redeliverMessagesSize {
- end = redeliverMessagesSize
- }
- _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
- pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, &pb.CommandRedeliverUnacknowledgedMessages{
- ConsumerId: proto.Uint64(pc.consumerID),
- MessageIds: pc.redeliverMessages[i:end],
- })
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- redeliver.err = err
- }
- }
-
- // clear redeliverMessages slice
- pc.redeliverMessages = nil
-
- if pc.unAckTracker != nil {
- pc.unAckTracker.clear()
- }
- redeliver.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) runEventsLoop() {
- for {
- select {
- case i := <-pc.eventsChan:
- switch v := i.(type) {
- case *handlerClose:
- pc.internalClose(v)
- return
- case *handleSeek:
- pc.internalSeek(v)
- case *handleUnsubscribe:
- pc.internalUnsubscribe(v)
- case *handleAckCumulative:
- pc.internalAckCumulative(v)
- case *handleAck:
- pc.internalAck(v)
- case *handleRedeliver:
- pc.internalRedeliver(v)
- case *handleConnectionClosed:
- pc.reconnectToBroker()
- }
- }
- }
-}
-
-func (pc *partitionConsumer) internalClose(req *handlerClose) {
- if pc.state != consumerReady {
- req.waitGroup.Done()
- return
- }
-
- pc.state = consumerClosing
- pc.log.Info("Closing consumer")
-
- requestID := pc.client.rpcClient.NewRequestID()
- _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, pb.BaseCommand_CLOSE_CONSUMER, &pb.CommandCloseConsumer{
- ConsumerId: proto.Uint64(pc.consumerID),
- RequestId: proto.Uint64(requestID),
- })
- pc.cnx.DeleteConsumeHandler(pc.consumerID)
-
- if err != nil {
- req.err = err
- } else {
- pc.log.Info("Closed consumer")
- pc.state = consumerClosed
- close(pc.options.MessageChannel)
- }
-
- req.waitGroup.Done()
-}
-
-// Flow command gives additional permits to send messages to the consumer.
-// A typical consumer implementation will use a queue to accuMulate these messages
-// before the application is ready to consume them. After the consumer is ready,
-// the client needs to give permission to the broker to push messages.
-func (pc *partitionConsumer) internalFlow(permits uint32) error {
- if permits <= 0 {
- return fmt.Errorf("invalid number of permits requested: %d", permits)
- }
-
- requestID := pc.client.rpcClient.NewRequestID()
- _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
- pb.BaseCommand_FLOW, &pb.CommandFlow{
- ConsumerId: proto.Uint64(pc.consumerID),
- MessagePermits: proto.Uint32(permits),
- })
-
- if err != nil {
- pc.log.WithError(err).Error("Failed to unsubscribe consumer")
- return err
- }
- return nil
-}
-
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, headersAndPayload internal.Buffer) error {
- pbMsgID := response.GetMessageId()
- reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
- msgMeta, err := reader.ReadMessageMetadata()
- if err != nil {
- // TODO send discardCorruptedMessage
- return err
- }
-
- numMsgs := 1
- if msgMeta.NumMessagesInBatch != nil {
- numMsgs = int(msgMeta.GetNumMessagesInBatch())
- }
- for i := 0; i < numMsgs; i++ {
- ssm, payload, err := reader.ReadMessage()
- if err != nil {
- // TODO send
- return err
- }
-
- msgID := newMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
- var msg Message
- if ssm == nil {
- msg = &message{
- publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
- eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
- key: msgMeta.GetPartitionKey(),
- properties: internal.ConvertToStringMap(msgMeta.GetProperties()),
- topic: pc.topic,
- msgID: msgID,
- payLoad: payload,
- }
- } else {
- msg = &message{
- publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
- eventTime: timeFromUnixTimestampMillis(ssm.GetEventTime()),
- key: ssm.GetPartitionKey(),
- properties: internal.ConvertToStringMap(ssm.GetProperties()),
- topic: pc.topic,
- msgID: msgID,
- payLoad: payload,
- }
- }
-
- if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
- // TODO handle error
- return err
- }
- }
-
- return nil
-}
-
-
-func (pc *partitionConsumer) dispatchMessage(msg Message, msgID *pb.MessageIdData) error {
- select {
- case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
- //Add messageId to redeliverMessages buffer, avoiding duplicates.
- var dup bool
-
- pc.omu.Lock()
- for _, mid := range pc.redeliverMessages {
- if proto.Equal(mid, msgID) {
- dup = true
- break
- }
- }
-
- if !dup {
- pc.redeliverMessages = append(pc.redeliverMessages, msgID)
- }
- pc.omu.Unlock()
- default:
- return fmt.Errorf("consumer message channel on topic %s is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
- }
- return nil
-}
-
-type handleAck struct {
- msgID MessageID
- waitGroup *sync.WaitGroup
- err error
-}
-
-type handleAckCumulative struct {
- msgID MessageID
- err error
-}
-
-type handleUnsubscribe struct {
- waitGroup *sync.WaitGroup
- err error
-}
-
-type handleSeek struct {
- msgID MessageID
- waitGroup *sync.WaitGroup
- err error
-}
-
-type handleRedeliver struct {
- waitGroup *sync.WaitGroup
- err error
-}
-
-type handlerClose struct {
- waitGroup *sync.WaitGroup
- err error
-}
-
-type handleConnectionClosed struct{}
-
-func (pc *partitionConsumer) ConnectionClosed() {
- // Trigger reconnection in the consumer goroutine
- pc.eventsChan <- &handleConnectionClosed{}
-}
-
-func (pc *partitionConsumer) reconnectToBroker() {
- backoff := internal.Backoff{}
- for {
- if pc.state != consumerReady {
- // Consumer is already closing
- return
- }
-
- d := backoff.Next()
- pc.log.Info("Reconnecting to broker in ", d)
- time.Sleep(d)
-
- err := pc.grabCnx()
- if err == nil {
- // Successfully reconnected
- pc.log.Info("Reconnected consumer to broker")
- return
- }
- }
-}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a68e34a..b1d4faa 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -34,7 +34,6 @@ import (
"github.com/apache/pulsar-client-go/pkg/auth"
"github.com/apache/pulsar-client-go/pkg/pb"
- "github.com/apache/pulsar-client-go/util"
)
type TLSOptions struct {
@@ -286,7 +285,10 @@ func (c *connection) run() {
if req == nil {
return
}
- c.pendingReqs[req.id] = req
+ // does this request expect a response?
+ if req.id != RequestIDNoResponse {
+ c.pendingReqs[req.id] = req
+ }
c.writeCommand(req.cmd)
case cmd := <- c.incomingCmdCh:
@@ -509,9 +511,7 @@ func (c *connection) handleCloseConsumer(closeConsumer *pb.CommandCloseConsumer)
c.log.Infof("Broker notification of Closed consumer: %d", closeConsumer.GetConsumerId())
consumerID := closeConsumer.GetConsumerId()
if consumer, ok := c.consumerHandler(consumerID); ok {
- if !util.IsNil(consumer) {
- consumer.ConnectionClosed()
- }
+ consumer.ConnectionClosed()
} else {
c.log.WithField("consumerID", consumerID).Warnf("Consumer with ID not found while closing consumer")
}
@@ -564,9 +564,14 @@ func (c *connection) Close() {
listener.ConnectionClosed()
}
+ consumerHandlers := make(map[uint64]ConsumerHandler)
c.consumerHandlersLock.RLock()
- defer c.consumerHandlersLock.RUnlock()
- for _, handler := range c.consumerHandlers {
+ for id, handler := range c.consumerHandlers {
+ consumerHandlers[id] = handler
+ }
+ c.consumerHandlersLock.RUnlock()
+
+ for _, handler := range consumerHandlers {
handler.ConnectionClosed()
}
}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 35d8b61..6a01fe0 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -31,6 +31,9 @@ type RPCResult struct {
Cnx Connection
}
+// RequestID for a request when there is no expected response
+const RequestIDNoResponse = uint64(0)
+
type RPCClient interface {
// Create a new unique request id
NewRequestID() uint64
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 9930a99..1762fd7 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -23,6 +23,8 @@ import (
"fmt"
"log"
"net/http"
+ "strings"
+ "testing"
"time"
)
@@ -63,3 +65,21 @@ func httpPut(url string, body interface{}) {
}
resp.Body.Close()
}
+
+func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+ client := http.Client{}
+
+ req, err := http.NewRequest(method, urls, strings.NewReader(body))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ req.Header.Set("Content-Type", "application/json")
+ req.Header.Set("Accept", "application/json")
+
+ res, err := client.Do(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer res.Body.Close()
+}
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
index ffc6eff..5dfe895 100644
--- a/pulsar/unacked_msg_tracker.go
+++ b/pulsar/unacked_msg_tracker.go
@@ -174,7 +174,7 @@ func (t *UnackedMessageTracker) handlerCmd() {
MessageIds: messageIdsMap[int32(index)],
}
- _, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.cnx, requestID,
+ _, err := subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
if err != nil {
subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
diff --git a/util/util.go b/util/util.go
deleted file mode 100644
index bd4f5d6..0000000
--- a/util/util.go
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
- "reflect"
-)
-
-// IsNil check if the interface is nil
-func IsNil(i interface{}) bool {
- vi := reflect.ValueOf(i)
- if vi.Kind() == reflect.Ptr {
- return vi.IsNil()
- }
- return false
-}
-
-// RemoveDuplicateElement remove repeating elements from the string slice
-func RemoveDuplicateElement(addrs []string) []string {
- result := make([]string, 0, len(addrs))
- temp := map[string]struct{}{}
- for _, item := range addrs {
- if _, ok := temp[item]; !ok {
- temp[item] = struct{}{}
- result = append(result, item)
- }
- }
- return result
-}
diff --git a/util/util_test.go b/util/util_test.go
deleted file mode 100644
index 3b0a9f9..0000000
--- a/util/util_test.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
- "fmt"
- "strings"
- "testing"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestIsNil(t *testing.T) {
- var a interface{}
- var b interface{} = (*int)(nil)
-
- assert.True(t, a == nil)
- assert.False(t, b == nil)
-}
-
-func TestRemoveDuplicateElement(t *testing.T) {
- s := []string{"hello", "world", "hello", "golang", "hello", "ruby", "php", "java"}
- resList := RemoveDuplicateElement(s)
- res := fmt.Sprintf("%s", resList)
- assert.Equal(t, 1, strings.Count(res, "hello"))
-}