You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/06 01:42:11 UTC
[pulsar-client-go] branch master updated: [Issue 304][Reader] fixed
panic in CreateReader API using custom MessageID for ReaderOptions (#305)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ee17df9 [Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305)
ee17df9 is described below
commit ee17df907aecb136165f5a79baeb96ed3938a210
Author: Nitish Vashishtha <72...@users.noreply.github.com>
AuthorDate: Sun Jul 5 18:42:05 2020 -0700
[Issue 304][Reader] fixed panic in CreateReader API using custom MessageID for ReaderOptions (#305)
### Motivation
User of the client's client's [CreateReader](https://github.com/apache/pulsar-client-go/blob/master/pulsar/client.go#L109) API can use a custom type satisfying the [MessageID](https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L108) interface, when using it as a value for `StartMessageID` in [ReaderOptions](https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader.go#L48) argument for the mentioned API.
The current reader creation does an untested type assertion here, when preparing the `consumerOptions` needed for creating a `partitionConsumer`.
https://github.com/apache/pulsar-client-go/blob/master/pulsar/reader_impl.go#L64
This assertion of `MessageID` as `*messageID` will fail unless an instance of `MessageID` is created from one of these exported APIs because `messageID` is unexported
https://github.com/apache/pulsar-client-go/blob/master/pulsar/message.go#L114-#L126
Note: `newMessageID` returns `*messageID` which satisfies `MessageID` interface as well.
### Modifications
Test the type assertion of `MessageID` as `*messageID`, if it fails, re-create a new `MessageID` using this
https://github.com/apache/pulsar-client-go/blob/975eb3781644ebe588fc142e53eadf39fe50341a/pulsar/impl_message.go#L97
This will ensure that the custom type can be re-created as a `*messageID` which can be used by `partitionConsumerOpts`
---
pulsar/reader_impl.go | 17 +++++++++--
pulsar/reader_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 99 insertions(+), 2 deletions(-)
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 1650f71..b3399dc 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -44,6 +44,19 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
return nil, newError(ResultInvalidConfiguration, "StartMessageID is required")
}
+ var startMessageID *messageID
+ var ok bool
+ if startMessageID, ok = options.StartMessageID.(*messageID); !ok {
+ // a custom type satisfying MessageID may not be a *messageID
+ // so re-create *messageID using its data
+ deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
+ if err != nil {
+ return nil, err
+ }
+ // de-serialized MessageID is a *messageID
+ startMessageID = deserMsgID.(*messageID)
+ }
+
subscriptionName := options.SubscriptionRolePrefix
if subscriptionName == "" {
subscriptionName = "reader"
@@ -61,7 +74,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
- startMessageID: options.StartMessageID.(*messageID),
+ startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: nonDurable,
readCompacted: options.ReadCompacted,
@@ -80,8 +93,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
if err != nil {
return nil, err
}
- pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
+ pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq)
if err != nil {
close(reader.messageCh)
return nil, err
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index c98771d..d99bfcb 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -362,3 +362,87 @@ func TestReaderHasNext(t *testing.T) {
assert.Equal(t, 10, i)
}
+
+type myMessageID struct {
+ data []byte
+}
+
+func (id *myMessageID) Serialize() []byte {
+ return id.data
+}
+
+func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ msgIDs := [10]MessageID{}
+ for i := 0; i < 10; i++ {
+ msgID, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, msgID)
+ msgIDs[i] = msgID
+ }
+
+ // custom start message ID
+ myStartMsgID := &myMessageID{
+ data: msgIDs[4].Serialize(),
+ }
+
+ // attempt to create reader on 5th message (not included)
+ var reader Reader
+ assert.NotPanics(t, func() {
+ reader, err = client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: myStartMsgID,
+ })
+ })
+
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ // receive the remaining 5 messages
+ for i := 5; i < 10; i++ {
+ msg, err := reader.Next(context.Background())
+ assert.NoError(t, err)
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ }
+
+ // create reader on 5th message (included)
+ readerInclusive, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: myStartMsgID,
+ StartMessageIDInclusive: true,
+ })
+
+ assert.Nil(t, err)
+ defer readerInclusive.Close()
+
+ // receive the remaining 6 messages
+ for i := 4; i < 10; i++ {
+ msg, err := readerInclusive.Next(context.Background())
+ assert.NoError(t, err)
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ }
+}