You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/03/22 06:26:43 UTC
[pulsar-client-go] branch master updated: Add schema support to Reader (#741)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 4f50a67 Add schema support to Reader (#741)
4f50a67 is described below
commit 4f50a678d9030828933e03c1a79ae310c38893ad
Author: Ziyao Wei <zi...@gmail.com>
AuthorDate: Tue Mar 22 02:26:37 2022 -0400
Add schema support to Reader (#741)
Add schema support to Reader
---
pulsar/reader.go | 3 +++
pulsar/reader_impl.go | 1 +
pulsar/reader_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
3 files changed, 47 insertions(+)
diff --git a/pulsar/reader.go b/pulsar/reader.go
index c45b8ff..f1cb575 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,6 +79,9 @@ type ReaderOptions struct {
// Decryption represents the encryption related fields required by the reader to decrypt a message.
Decryption *MessageDecryptionInfo
+
+ // Schema represents the schema implementation.
+ Schema Schema
}
// Reader can be used to scan through all the messages currently available in a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0fed80c..596884a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
nackRedeliveryDelay: defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
decryption: options.Decryption,
+ schema: options.Schema,
}
reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index bdafea0..aa12078 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -710,3 +710,46 @@ func TestProducerReaderRSAEncryption(t *testing.T) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}
+
+func TestReaderWithSchema(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ schema := NewStringSchema(nil)
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Schema: schema,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ value := "hello pulsar"
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Value: value,
+ })
+ assert.Nil(t, err)
+
+ // create reader
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ Schema: schema,
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ msg, err := reader.Next(context.Background())
+ assert.NoError(t, err)
+
+ var res *string
+ err = msg.GetSchemaValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, *res, value)
+}