You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 18:18:20 UTC

[pulsar-client-go] branch master updated: schema creation and validation functions without panicing (#794)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b392b9c  schema creation and validation functions without panicing (#794)
b392b9c is described below

commit b392b9cb4689052de2378c6aee1ba229f56bfb5c
Author: ming <it...@gmail.com>
AuthorDate: Thu Jun 23 14:18:15 2022 -0400

    schema creation and validation functions without panicing (#794)
---
 pulsar/schema.go      | 45 +++++++++++++++++++++++++++++++++++++++------
 pulsar/schema_test.go | 12 +++++++++---
 2 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/pulsar/schema.go b/pulsar/schema.go
index 8021ffe..2f18fad 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -91,18 +91,29 @@ type JSONSchema struct {
 	SchemaInfo
 }
 
+// NewJSONSchema creates a new JSONSchema
+// Note: the function will panic if creation of codec fails
 func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema {
+	js, err := NewJSONSchemaWithValidation(jsonAvroSchemaDef, properties)
+	if err != nil {
+		log.Fatalf("JSONSchema init codec error:%v", err)
+	}
+	return js
+}
+
+// NewJSONSchemaWithValidation creates a new JSONSchema and error to indicate codec failure
+func NewJSONSchemaWithValidation(jsonAvroSchemaDef string, properties map[string]string) (*JSONSchema, error) {
 	js := new(JSONSchema)
 	avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
 	if err != nil {
-		log.Fatalf("init codec error:%v", err)
+		return nil, err
 	}
 	schemaDef := NewSchemaDefinition(avroCodec)
 	js.SchemaInfo.Schema = schemaDef.Codec.Schema()
 	js.SchemaInfo.Type = JSON
 	js.SchemaInfo.Properties = properties
 	js.SchemaInfo.Name = "JSON"
-	return js
+	return js, nil
 }
 
 func (js *JSONSchema) Encode(data interface{}) ([]byte, error) {
@@ -126,11 +137,22 @@ type ProtoSchema struct {
 	SchemaInfo
 }
 
+// NewProtoSchema creates a new ProtoSchema
+// Note: the function will panic if creation of codec fails
 func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
+	ps, err := NewProtoSchemaWithValidation(protoAvroSchemaDef, properties)
+	if err != nil {
+		log.Fatalf("ProtoSchema init codec error:%v", err)
+	}
+	return ps
+}
+
+// NewProtoSchemaWithValidation creates a new ProtoSchema and error to indicate codec failure
+func NewProtoSchemaWithValidation(protoAvroSchemaDef string, properties map[string]string) (*ProtoSchema, error) {
 	ps := new(ProtoSchema)
 	avroCodec, err := initAvroCodec(protoAvroSchemaDef)
 	if err != nil {
-		log.Fatalf("init codec error:%v", err)
+		return nil, err
 	}
 	schemaDef := NewSchemaDefinition(avroCodec)
 	ps.AvroCodec.Codec = schemaDef.Codec
@@ -138,7 +160,7 @@ func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *Pr
 	ps.SchemaInfo.Type = PROTOBUF
 	ps.SchemaInfo.Properties = properties
 	ps.SchemaInfo.Name = "Proto"
-	return ps
+	return ps, nil
 }
 
 func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
@@ -162,11 +184,22 @@ type AvroSchema struct {
 	SchemaInfo
 }
 
+// NewAvroSchema creates a new AvroSchema
+// Note: the function will panic if creation of codec fails
 func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
+	ps, err := NewAvroSchemaWithValidation(avroSchemaDef, properties)
+	if err != nil {
+		log.Fatalf("AvroSchema init codec error:%v", err)
+	}
+	return ps
+}
+
+// NewAvroSchemaWithValidation creates a new AvroSchema and error to indicate codec failure
+func NewAvroSchemaWithValidation(avroSchemaDef string, properties map[string]string) (*AvroSchema, error) {
 	as := new(AvroSchema)
 	avroCodec, err := initAvroCodec(avroSchemaDef)
 	if err != nil {
-		log.Fatalf("init codec error:%v", err)
+		return nil, err
 	}
 	schemaDef := NewSchemaDefinition(avroCodec)
 	as.AvroCodec.Codec = schemaDef.Codec
@@ -174,7 +207,7 @@ func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSche
 	as.SchemaInfo.Type = AVRO
 	as.SchemaInfo.Name = "Avro"
 	as.SchemaInfo.Properties = properties
-	return as
+	return as, nil
 }
 
 func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
index ce19bac..1aa35f0 100644
--- a/pulsar/schema_test.go
+++ b/pulsar/schema_test.go
@@ -82,7 +82,9 @@ func TestJsonSchema(t *testing.T) {
 	//create consumer
 	var s testJSON
 
-	consumerJS := NewJSONSchema(exampleSchemaDef, nil)
+	consumerJS, err := NewJSONSchemaWithValidation(exampleSchemaDef, nil)
+	assert.Nil(t, err)
+	consumerJS = NewJSONSchema(exampleSchemaDef, nil) // test this legacy function
 	consumer, err := client.Subscribe(ConsumerOptions{
 		Topic:                       "jsonTopic",
 		SubscriptionName:            "sub-1",
@@ -105,7 +107,9 @@ func TestProtoSchema(t *testing.T) {
 	defer client.Close()
 
 	// create producer
-	psProducer := NewProtoSchema(protoSchemaDef, nil)
+	psProducer, err := NewProtoSchemaWithValidation(protoSchemaDef, nil)
+	assert.Nil(t, err)
+	psProducer = NewProtoSchema(protoSchemaDef, nil)
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic:  "proto",
 		Schema: psProducer,
@@ -146,7 +150,9 @@ func TestAvroSchema(t *testing.T) {
 	defer client.Close()
 
 	// create producer
-	asProducer := NewAvroSchema(exampleSchemaDef, nil)
+	asProducer, err := NewAvroSchemaWithValidation(exampleSchemaDef, nil)
+	assert.Nil(t, err)
+	asProducer = NewAvroSchema(exampleSchemaDef, nil)
 	producer, err := client.CreateProducer(ProducerOptions{
 		Topic:  "avro-topic",
 		Schema: asProducer,