You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/23 18:18:20 UTC
[pulsar-client-go] branch master updated: schema creation and validation functions without panicing (#794)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b392b9c schema creation and validation functions without panicing (#794)
b392b9c is described below
commit b392b9cb4689052de2378c6aee1ba229f56bfb5c
Author: ming <it...@gmail.com>
AuthorDate: Thu Jun 23 14:18:15 2022 -0400
schema creation and validation functions without panicing (#794)
---
pulsar/schema.go | 45 +++++++++++++++++++++++++++++++++++++++------
pulsar/schema_test.go | 12 +++++++++---
2 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 8021ffe..2f18fad 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -91,18 +91,29 @@ type JSONSchema struct {
SchemaInfo
}
+// NewJSONSchema creates a new JSONSchema
+// Note: the function will panic if creation of codec fails
func NewJSONSchema(jsonAvroSchemaDef string, properties map[string]string) *JSONSchema {
+ js, err := NewJSONSchemaWithValidation(jsonAvroSchemaDef, properties)
+ if err != nil {
+ log.Fatalf("JSONSchema init codec error:%v", err)
+ }
+ return js
+}
+
+// NewJSONSchemaWithValidation creates a new JSONSchema and error to indicate codec failure
+func NewJSONSchemaWithValidation(jsonAvroSchemaDef string, properties map[string]string) (*JSONSchema, error) {
js := new(JSONSchema)
avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
if err != nil {
- log.Fatalf("init codec error:%v", err)
+ return nil, err
}
schemaDef := NewSchemaDefinition(avroCodec)
js.SchemaInfo.Schema = schemaDef.Codec.Schema()
js.SchemaInfo.Type = JSON
js.SchemaInfo.Properties = properties
js.SchemaInfo.Name = "JSON"
- return js
+ return js, nil
}
func (js *JSONSchema) Encode(data interface{}) ([]byte, error) {
@@ -126,11 +137,22 @@ type ProtoSchema struct {
SchemaInfo
}
+// NewProtoSchema creates a new ProtoSchema
+// Note: the function will panic if creation of codec fails
func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
+ ps, err := NewProtoSchemaWithValidation(protoAvroSchemaDef, properties)
+ if err != nil {
+ log.Fatalf("ProtoSchema init codec error:%v", err)
+ }
+ return ps
+}
+
+// NewProtoSchemaWithValidation creates a new ProtoSchema and error to indicate codec failure
+func NewProtoSchemaWithValidation(protoAvroSchemaDef string, properties map[string]string) (*ProtoSchema, error) {
ps := new(ProtoSchema)
avroCodec, err := initAvroCodec(protoAvroSchemaDef)
if err != nil {
- log.Fatalf("init codec error:%v", err)
+ return nil, err
}
schemaDef := NewSchemaDefinition(avroCodec)
ps.AvroCodec.Codec = schemaDef.Codec
@@ -138,7 +160,7 @@ func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *Pr
ps.SchemaInfo.Type = PROTOBUF
ps.SchemaInfo.Properties = properties
ps.SchemaInfo.Name = "Proto"
- return ps
+ return ps, nil
}
func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
@@ -162,11 +184,22 @@ type AvroSchema struct {
SchemaInfo
}
+// NewAvroSchema creates a new AvroSchema
+// Note: the function will panic if creation of codec fails
func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
+ ps, err := NewAvroSchemaWithValidation(avroSchemaDef, properties)
+ if err != nil {
+ log.Fatalf("AvroSchema init codec error:%v", err)
+ }
+ return ps
+}
+
+// NewAvroSchemaWithValidation creates a new AvroSchema and error to indicate codec failure
+func NewAvroSchemaWithValidation(avroSchemaDef string, properties map[string]string) (*AvroSchema, error) {
as := new(AvroSchema)
avroCodec, err := initAvroCodec(avroSchemaDef)
if err != nil {
- log.Fatalf("init codec error:%v", err)
+ return nil, err
}
schemaDef := NewSchemaDefinition(avroCodec)
as.AvroCodec.Codec = schemaDef.Codec
@@ -174,7 +207,7 @@ func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSche
as.SchemaInfo.Type = AVRO
as.SchemaInfo.Name = "Avro"
as.SchemaInfo.Properties = properties
- return as
+ return as, nil
}
func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
diff --git a/pulsar/schema_test.go b/pulsar/schema_test.go
index ce19bac..1aa35f0 100644
--- a/pulsar/schema_test.go
+++ b/pulsar/schema_test.go
@@ -82,7 +82,9 @@ func TestJsonSchema(t *testing.T) {
//create consumer
var s testJSON
- consumerJS := NewJSONSchema(exampleSchemaDef, nil)
+ consumerJS, err := NewJSONSchemaWithValidation(exampleSchemaDef, nil)
+ assert.Nil(t, err)
+ consumerJS = NewJSONSchema(exampleSchemaDef, nil) // test this legacy function
consumer, err := client.Subscribe(ConsumerOptions{
Topic: "jsonTopic",
SubscriptionName: "sub-1",
@@ -105,7 +107,9 @@ func TestProtoSchema(t *testing.T) {
defer client.Close()
// create producer
- psProducer := NewProtoSchema(protoSchemaDef, nil)
+ psProducer, err := NewProtoSchemaWithValidation(protoSchemaDef, nil)
+ assert.Nil(t, err)
+ psProducer = NewProtoSchema(protoSchemaDef, nil)
producer, err := client.CreateProducer(ProducerOptions{
Topic: "proto",
Schema: psProducer,
@@ -146,7 +150,9 @@ func TestAvroSchema(t *testing.T) {
defer client.Close()
// create producer
- asProducer := NewAvroSchema(exampleSchemaDef, nil)
+ asProducer, err := NewAvroSchemaWithValidation(exampleSchemaDef, nil)
+ assert.Nil(t, err)
+ asProducer = NewAvroSchema(exampleSchemaDef, nil)
producer, err := client.CreateProducer(ProducerOptions{
Topic: "avro-topic",
Schema: asProducer,