You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/03/22 06:26:43 UTC

[pulsar-client-go] branch master updated: Add schema support to Reader (#741)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f50a67  Add schema support to Reader (#741)
4f50a67 is described below

commit 4f50a678d9030828933e03c1a79ae310c38893ad
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Tue Mar 22 02:26:37 2022 -0400

    Add schema support to Reader (#741)
    
    Add schema support to Reader
---
 pulsar/reader.go      |  3 +++
 pulsar/reader_impl.go |  1 +
 pulsar/reader_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 47 insertions(+)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index c45b8ff..f1cb575 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,6 +79,9 @@ type ReaderOptions struct {
 
 	// Decryption represents the encryption related fields required by the reader to decrypt a message.
 	Decryption *MessageDecryptionInfo
+
+	// Schema represents the schema implementation.
+	Schema Schema
 }
 
 // Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0fed80c..596884a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
 		nackRedeliveryDelay:        defaultNackRedeliveryDelay,
 		replicateSubscriptionState: false,
 		decryption:                 options.Decryption,
+		schema:                     options.Schema,
 	}
 
 	reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index bdafea0..aa12078 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -710,3 +710,46 @@ func TestProducerReaderRSAEncryption(t *testing.T) {
 		assert.Equal(t, []byte(expectMsg), msg.Payload())
 	}
 }
+
+func TestReaderWithSchema(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: lookupURL,
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	topic := newTopicName()
+	schema := NewStringSchema(nil)
+
+	// create producer
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:  topic,
+		Schema: schema,
+	})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	value := "hello pulsar"
+	_, err = producer.Send(context.Background(), &ProducerMessage{
+		Value: value,
+	})
+	assert.Nil(t, err)
+
+	// create reader
+	reader, err := client.CreateReader(ReaderOptions{
+		Topic:          topic,
+		StartMessageID: EarliestMessageID(),
+		Schema:         schema,
+	})
+	assert.Nil(t, err)
+	defer reader.Close()
+
+	msg, err := reader.Next(context.Background())
+	assert.NoError(t, err)
+
+	var res *string
+	err = msg.GetSchemaValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, *res, value)
+}