You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/03 09:39:55 UTC
[GitHub] [pulsar-client-go] oryx2 opened a new pull request #611: feat: support multiple schema version for producer and consumer
oryx2 opened a new pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611
### Motivation
Implement [PIP-43.](https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1)
Support multiple schema version for producer and consumer.
### Modifications
- add schema cache for producer and consumer
- add `DisableMultiSchema` option for producer
### Verifying this change
- [x] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (**yes**)
- The schema: (**yes**)
- The default values of configurations: (**yes**)
- The wire protocol: (no)
### Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
- If a feature is not applicable for documentation, explain why?
- If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1023670646
@cckellogg @zzzming can we please review the changes on the PR. Thanks in advance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788328382
##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
return topics, nil
}
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+ return nil, errors.New("not support")
Review comment:
Thanks, I will add error message.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-950011686
Hello @cckellogg and @oryx2.
Can this PR be merged. It is a great feature and we are looking forward to using it. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] lhotari commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787760098
##########
File path: pulsar/producer.go
##########
@@ -166,6 +166,10 @@ type ProducerOptions struct {
// Default is 1 minute
PartitionsAutoDiscoveryInterval time.Duration
+ // Disable multiple Schame Version
+ // Default false
Review comment:
whitespace
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
Review comment:
whitespace
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788329230
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
Review comment:
I found a function do the same job [ConvertToStringMap](https://github.com/apache/pulsar-client-go/blob/a119bab0f8598601c0eb7f0fcd97da7ab06700c7/pulsar/internal/commands.go#/L302), but no check nil. May be I should add nil check in ConvertToStringMap function, then invoker it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016281620
@lhotari I just resolve them, please check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788311692
##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg := request.msg
payload := msg.Payload
+
var schemaPayload []byte
var err error
- if p.options.Schema != nil {
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema != p.options.Schema {
Review comment:
Yes. Maybe I can compare the value of Schema with this.
`msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] lhotari commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016235087
@chiragbparikh there are merge conflicts in this PR. please resolve them so that the PR can be reviewed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016129670
Hello @cckellogg and @oryx2. This is a gentle reminder to please review and merge this PR. As I mentioned above we are eagerly waiting for this feature
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] lhotari commented on pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016485644
@cckellogg PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] zzzming commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
zzzming commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787775362
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
Review comment:
return schema nil
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
Review comment:
since Key and Value are both declared as *string, do you need check the reference is not nil before assigning them to properties?
##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg := request.msg
payload := msg.Payload
+
var schemaPayload []byte
var err error
- if p.options.Schema != nil {
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema != p.options.Schema {
Review comment:
I am not sure if we want to compare the pointer or the actual value. Shema is an interface. So the current evaluation just make sure they are pointing to the same reference. Do you want to compare the value of Schema?
##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
return topics, nil
}
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+ return nil, errors.New("not support")
Review comment:
Can we make this more descriptive? Maybe an error like this helps to debug.
errors.New("GetSchema is supported by httpLookupService")
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
+ }
+ }
+ schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+ if err != nil {
+ return nil, err
+ }
+ s.add(key, schema)
+ return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.cache[schemaVersionHash] = schema
Review comment:
Do we allow schema version overwrite?
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
Review comment:
Run `go fmt` to fix all formatting issues
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
Review comment:
Or
s.lock.RLock()
schema, ok = s.cache[key]
s.lock.RUnlock()
if ok {
return schema, nil
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788329960
##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
dlq *dlqRouter
log log.Logger
-
+ providersMutex sync.RWMutex
compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
metrics *internal.LeveledMetrics
decryptor cryptointernal.Decryptor
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+ key := hex.EncodeToString(schemaVersion)
+ s.lock.RLock()
+ schema = s.cache[key]
+ s.lock.RUnlock()
+ if schema != nil {
+ return
+ }
+
+ pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+ if err != nil {
+ return nil, err
+ }
+
+ var properties = make(map[string]string)
+ if pbSchema.Properties != nil {
+ for _, entry := range pbSchema.Properties {
+ properties[*entry.Key] = properties[*entry.Value]
+ }
+ }
+ schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+ if err != nil {
+ return nil, err
+ }
+ s.add(key, schema)
+ return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.cache[schemaVersionHash] = schema
Review comment:
You are right, overwrite is not allowed. It is just a private function for `SchemaInfoCache.Get` which do the overwrite check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702288485
##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
}
func (msg *message) GetSchemaValue(v interface{}) error {
+ if msg.schemaVersion != nil {
+ schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+ if err != nil {
+ return err
+ }
+ return (*schema).Decode(msg.payLoad, v)
Review comment:
Thanks review. Because schemaCache store the pointer of schema interface, I have change to schema instance .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #611: feat: support multiple schema version for producer and consumer
Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702008258
##########
File path: pulsar/consumer_partition.go
##########
@@ -141,6 +142,63 @@ type partitionConsumer struct {
providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
+ schemaInfoCache *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+ lock sync.RWMutex
+ cache map[string]*Schema
+ client *client
+ topic string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+ return &schemaInfoCache{
+ cache: make(map[string]*Schema),
+ client: client,
+ topic: topic,
+ }
+}
+
+func (s *schemaInfoCache) get(key string) (schema *Schema) {
Review comment:
Let's remove the variable names here and below and return values from the functions. It makes the code more difficult to follow especially below since some returns have values and others do not.
##########
File path: pulsar/producer_partition.go
##########
@@ -361,19 +407,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg := request.msg
payload := msg.Payload
+
var schemaPayload []byte
var err error
- if p.options.Schema != nil {
- schemaPayload, err = p.options.Schema.Encode(msg.Value)
+ if msg.Value != nil && msg.Payload != nil {
+ p.log.Error("Can not set Value and Payload both")
+ return
+ }
+
+ if p.options.DisableMultiSchema {
+ if msg.Schema == nil && p.options.Schema == nil && !msg.Schema.GetSchemaInfo().isSame((p.options.Schema).GetSchemaInfo()) {
Review comment:
Can this be simplified or a comment added. I'm having a difficult time understanding.
##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
}
func (msg *message) GetSchemaValue(v interface{}) error {
+ if msg.schemaVersion != nil {
+ schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+ if err != nil {
+ return err
+ }
+ return (*schema).Decode(msg.payLoad, v)
Review comment:
Why is casting needed here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org