You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/03 09:39:55 UTC

[GitHub] [pulsar-client-go] oryx2 opened a new pull request #611: feat: support multiple schema version for producer and consumer

oryx2 opened a new pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611


   
   
   ### Motivation
   
   Implement [PIP-43.](https://github.com/apache/pulsar/wiki/PIP-43%3A-producer-send-message-with-different-schema#changespart-1)
   Support multiple schema version for producer and consumer.
   ### Modifications
   
   -  add schema cache for producer and consumer
   -  add `DisableMultiSchema` option for producer
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (**yes**)
     - The schema: (**yes**)
     - The default values of configurations: (**yes**)
     - The wire protocol: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1023670646


   @cckellogg @zzzming can we please review the changes on the PR. Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788328382



##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
 	return topics, nil
 }
 
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+	return nil, errors.New("not support")

Review comment:
       Thanks, I will add error message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-950011686


   Hello @cckellogg and @oryx2. 
   Can this PR be merged. It is a great feature and we are looking forward to using it. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] lhotari commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787760098



##########
File path: pulsar/producer.go
##########
@@ -166,6 +166,10 @@ type ProducerOptions struct {
 	// Default is 1 minute
 	PartitionsAutoDiscoveryInterval time.Duration
 
+  // Disable multiple Schame Version
+  // Default false

Review comment:
       whitespace

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex

Review comment:
       whitespace




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788329230



##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return
+	}
+
+	pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+	if err != nil {
+		return nil, err
+	}
+
+	var properties = make(map[string]string)
+	if pbSchema.Properties != nil {
+		for _, entry := range pbSchema.Properties {
+			properties[*entry.Key] = properties[*entry.Value]

Review comment:
       I found a function do the same job [ConvertToStringMap](https://github.com/apache/pulsar-client-go/blob/a119bab0f8598601c0eb7f0fcd97da7ab06700c7/pulsar/internal/commands.go#/L302), but no check nil. May be I should add  nil check in ConvertToStringMap function, then invoker it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016281620


   @lhotari I just resolve them, please check.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788311692



##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	msg := request.msg
 
 	payload := msg.Payload
+
 	var schemaPayload []byte
 	var err error
-	if p.options.Schema != nil {
-		schemaPayload, err = p.options.Schema.Encode(msg.Value)
+	if msg.Value != nil && msg.Payload != nil {
+		p.log.Error("Can not set Value and Payload both")
+		return
+	}
+
+	if p.options.DisableMultiSchema {
+		if msg.Schema != p.options.Schema {

Review comment:
       Yes. Maybe I can compare the value of Schema with this.
   `msg.Schema != nil && p.options.Schema != nil && msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] lhotari commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016235087


   @chiragbparikh there are merge conflicts in this PR. please resolve them so that the PR can be reviewed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] chiragbparikh commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
chiragbparikh commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016129670


   Hello @cckellogg and @oryx2. This is a gentle reminder to please review and merge this PR. As I mentioned above we are eagerly waiting for this feature


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] lhotari commented on pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#issuecomment-1016485644


   @cckellogg PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] zzzming commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
zzzming commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r787775362



##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return

Review comment:
       return schema nil

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return
+	}
+
+	pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+	if err != nil {
+		return nil, err
+	}
+
+	var properties = make(map[string]string)
+	if pbSchema.Properties != nil {
+		for _, entry := range pbSchema.Properties {
+			properties[*entry.Key] = properties[*entry.Value]

Review comment:
       since Key and Value are both declared as *string, do you need check the reference is not nil before assigning them to properties?

##########
File path: pulsar/producer_partition.go
##########
@@ -397,19 +460,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	msg := request.msg
 
 	payload := msg.Payload
+
 	var schemaPayload []byte
 	var err error
-	if p.options.Schema != nil {
-		schemaPayload, err = p.options.Schema.Encode(msg.Value)
+	if msg.Value != nil && msg.Payload != nil {
+		p.log.Error("Can not set Value and Payload both")
+		return
+	}
+
+	if p.options.DisableMultiSchema {
+		if msg.Schema != p.options.Schema {

Review comment:
       I am not sure if we want to compare the pointer or the actual value. Shema is an interface. So the current evaluation just make sure they are pointing to the same reference. Do you want to compare the value of Schema?

##########
File path: pulsar/internal/lookup_service.go
##########
@@ -358,6 +378,9 @@ func (h *httpLookupService) GetTopicsOfNamespace(namespace string, mode GetTopic
 	return topics, nil
 }
 
+func (h *httpLookupService) GetSchema(topic string, schemaVersion []byte) (schema *pb.Schema, err error) {
+	return nil, errors.New("not support")

Review comment:
       Can we make this more descriptive? Maybe an error like this helps to debug.
   errors.New("GetSchema is supported by httpLookupService")
   

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return
+	}
+
+	pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+	if err != nil {
+		return nil, err
+	}
+
+	var properties = make(map[string]string)
+	if pbSchema.Properties != nil {
+		for _, entry := range pbSchema.Properties {
+			properties[*entry.Key] = properties[*entry.Value]
+		}
+	}
+	schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+	if err != nil {
+		return nil, err
+	}
+	s.add(key, schema)
+	return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+
+	s.cache[schemaVersionHash] = schema

Review comment:
       Do we allow schema version overwrite?

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex

Review comment:
       Run `go fmt` to fix all formatting issues

##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return

Review comment:
       Or 
           s.lock.RLock()
   	schema, ok = s.cache[key]
   	s.lock.RUnlock()
   	if ok {
   		return schema, nil
            }




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r788329960



##########
File path: pulsar/consumer_partition.go
##########
@@ -143,10 +144,61 @@ type partitionConsumer struct {
 	dlq         *dlqRouter
 
 	log log.Logger
-
+  providersMutex       sync.RWMutex
 	compressionProviders sync.Map //map[pb.CompressionType]compression.Provider
 	metrics              *internal.LeveledMetrics
 	decryptor            cryptointernal.Decryptor
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) Get(schemaVersion []byte) (schema Schema, err error) {
+	key := hex.EncodeToString(schemaVersion)
+	s.lock.RLock()
+	schema = s.cache[key]
+	s.lock.RUnlock()
+	if schema != nil {
+		return
+	}
+
+	pbSchema, err := s.client.lookupService.GetSchema(s.topic, schemaVersion)
+	if err != nil {
+		return nil, err
+	}
+
+	var properties = make(map[string]string)
+	if pbSchema.Properties != nil {
+		for _, entry := range pbSchema.Properties {
+			properties[*entry.Key] = properties[*entry.Value]
+		}
+	}
+	schema, err = NewSchema(SchemaType(*pbSchema.Type), pbSchema.SchemaData, properties)
+	if err != nil {
+		return nil, err
+	}
+	s.add(key, schema)
+	return schema, nil
+
+}
+func (s *schemaInfoCache) add(schemaVersionHash string, schema Schema) {
+	s.lock.Lock()
+	defer s.lock.Unlock()
+
+	s.cache[schemaVersionHash] = schema

Review comment:
       You are right, overwrite is not allowed. It is just a private function for `SchemaInfoCache.Get` which do the overwrite check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] oryx2 commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
oryx2 commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702288485



##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
 }
 
 func (msg *message) GetSchemaValue(v interface{}) error {
+	if msg.schemaVersion != nil {
+		schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+		if err != nil {
+			return err
+		}
+		return (*schema).Decode(msg.payLoad, v)

Review comment:
       Thanks review. Because schemaCache store the pointer of schema interface, I have change to schema instance .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #611: feat: support multiple schema version for producer and consumer

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #611:
URL: https://github.com/apache/pulsar-client-go/pull/611#discussion_r702008258



##########
File path: pulsar/consumer_partition.go
##########
@@ -141,6 +142,63 @@ type partitionConsumer struct {
 	providersMutex       sync.RWMutex
 	compressionProviders map[pb.CompressionType]compression.Provider
 	metrics              *internal.TopicMetrics
+	schemaInfoCache      *schemaInfoCache
+}
+
+type schemaInfoCache struct {
+	lock   sync.RWMutex
+	cache  map[string]*Schema
+	client *client
+	topic  string
+}
+
+func newSchemaInfoCache(client *client, topic string) *schemaInfoCache {
+	return &schemaInfoCache{
+		cache:  make(map[string]*Schema),
+		client: client,
+		topic:  topic,
+	}
+}
+
+func (s *schemaInfoCache) get(key string) (schema *Schema) {

Review comment:
       Let's remove the variable names here and below and return values from the functions. It makes the code more difficult to follow especially below since some returns have values and others do not.

##########
File path: pulsar/producer_partition.go
##########
@@ -361,19 +407,48 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 	msg := request.msg
 
 	payload := msg.Payload
+
 	var schemaPayload []byte
 	var err error
-	if p.options.Schema != nil {
-		schemaPayload, err = p.options.Schema.Encode(msg.Value)
+	if msg.Value != nil && msg.Payload != nil {
+		p.log.Error("Can not set Value and Payload both")
+		return
+	}
+
+	if p.options.DisableMultiSchema {
+		if msg.Schema == nil && p.options.Schema == nil && !msg.Schema.GetSchemaInfo().isSame((p.options.Schema).GetSchemaInfo()) {

Review comment:
       Can this be simplified or a comment added. I'm having a difficult time understanding.

##########
File path: pulsar/impl_message.go
##########
@@ -262,9 +264,20 @@ func (msg *message) GetReplicatedFrom() string {
 }
 
 func (msg *message) GetSchemaValue(v interface{}) error {
+	if msg.schemaVersion != nil {
+		schema, err := msg.schemaInfoCache.Get(msg.schemaVersion)
+		if err != nil {
+			return err
+		}
+		return (*schema).Decode(msg.payLoad, v)

Review comment:
       Why is casting needed here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org