You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/06 01:42:11 UTC

[pulsar-client-go] branch master updated: [Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ee17df9  [Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305)
ee17df9 is described below

commit ee17df907aecb136165f5a79baeb96ed3938a210
Author: Nitish Vashishtha <72...@users.noreply.github.com>
AuthorDate: Sun Jul 5 18:42:05 2020 -0700

    [Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305)
    
    ### Motivation
    
    User of the client's client's [CreateReader](https://github.com/apache/pulsar-client-go/blob/master/pulsar/client.go#L109) API can use a custom type satisfying the [MessageID](https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L108) interface, when using it as a value for `StartMessageID` in [ReaderOptions](https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader.go#L48) argument for the mentioned API.
    
    The current reader creation does an untested type assertion here, when preparing the `consumerOptions` needed for creating a `partitionConsumer`.
    https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader_impl.go#L64
    
    This assertion of `MessageID` as `*messageID` will fail unless an instance of `MessageID` is created from one of these exported APIs because `messageID` is unexported
    https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L114-#L126
    Note: `newMessageID` returns `*messageID` which satisfies `MessageID` interface as well.
    
    
    ### Modifications
    
    Test the type assertion of `MessageID` as `*messageID`, if it fails, re-create a new `MessageID` using this
    https://github.com/apache/pulsar-client-go/blob/975eb3781644ebe588fc142e53eadf39fe50341a/pulsar/impl_message.go#L97
    This will ensure that the custom type can be re-created as a `*messageID` which can be used by `partitionConsumerOpts`
---
 pulsar/reader_impl.go | 17 +++++++++--
 pulsar/reader_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 99 insertions(+), 2 deletions(-)

diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 1650f71..b3399dc 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -44,6 +44,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
 		return nil, newError(ResultInvalidConfiguration, "StartMessageID is required")
 	}
 
+	var startMessageID *messageID
+	var ok bool
+	if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
+		// a custom type satisfying MessageID may not be a *messageID
+		// so re-create *messageID using its data
+		deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
+		if err != nil {
+			return nil, err
+		}
+		// de-serialized MessageID is a *messageID
+		startMessageID = deserMsgID.(*messageID)
+	}
+
 	subscriptionName := options.SubscriptionRolePrefix
 	if subscriptionName == "" {
 		subscriptionName = "reader"
@@ -61,7 +74,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
 		subscription:               subscriptionName,
 		subscriptionType:           Exclusive,
 		receiverQueueSize:          receiverQueueSize,
-		startMessageID:             options.StartMessageID.(*messageID),
+		startMessageID:             startMessageID,
 		startMessageIDInclusive:    options.StartMessageIDInclusive,
 		subscriptionMode:           nonDurable,
 		readCompacted:              options.ReadCompacted,
@@ -80,8 +93,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
 	if err != nil {
 		return nil, err
 	}
-	pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
 
+	pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
 	if err != nil {
 		close(reader.messageCh)
 		return nil, err
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index c98771d..d99bfcb 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -362,3 +362,87 @@ func TestReaderHasNext(t *testing.T) {
 
 	assert.Equal(t, 10, i)
 }
+
+type myMessageID struct {
+	data []byte
+}
+
+func (id *myMessageID) Serialize() []byte {
+	return id.data
+}
+
+func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	ctx := context.Background()
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           topic,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	// send 10 messages
+	msgIDs := [10]MessageID{}
+	for i := 0; i < 10; i++ {
+		msgID, err := producer.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		})
+		assert.NoError(t, err)
+		assert.NotNil(t, msgID)
+		msgIDs[i] = msgID
+	}
+
+	// custom start message ID
+	myStartMsgID := &myMessageID{
+		data: msgIDs[4].Serialize(),
+	}
+
+	// attempt to create reader on 5th message (not included)
+	var reader Reader
+	assert.NotPanics(t, func() {
+		reader, err = client.CreateReader(ReaderOptions{
+			Topic:          topic,
+			StartMessageID: myStartMsgID,
+		})
+	})
+
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	// receive the remaining 5 messages
+	for i := 5; i < 10; i++ {
+		msg, err := reader.Next(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+	}
+
+	// create reader on 5th message (included)
+	readerInclusive, err := client.CreateReader(ReaderOptions{
+		Topic:                   topic,
+		StartMessageID:          myStartMsgID,
+		StartMessageIDInclusive: true,
+	})
+
+	assert.Nil(t, err)
+	defer readerInclusive.Close()
+
+	// receive the remaining 6 messages
+	for i := 4; i < 10; i++ {
+		msg, err := readerInclusive.Next(context.Background())
+		assert.NoError(t, err)
+
+		expectMsg := fmt.Sprintf("hello-%d", i)
+		assert.Equal(t, []byte(expectMsg), msg.Payload())
+	}
+}