You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/04/23 04:59:39 UTC

[pulsar] branch master updated: [go schema] support go schema for pulsar-client-go (#3904)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d5036ea  [go schema] support go schema for pulsar-client-go (#3904)
d5036ea is described below

commit d5036eaefcba42e6c9d159ffb14bee597d13380f
Author: 冉小龙 <rx...@qq.com>
AuthorDate: Tue Apr 23 12:59:33 2019 +0800

    [go schema] support go schema for pulsar-client-go (#3904)
    
    
    Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
    
    Master Issue: #3855
    
    Motivation
    support go schema for pulsar-client-go
---
 pulsar-client-cpp/include/pulsar/Schema.h          |  52 +--
 .../include/pulsar/c/consumer_configuration.h      |   5 +
 .../include/pulsar/c/producer_configuration.h      |  23 +
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |   8 +
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |   7 +
 pulsar-client-go/go.mod                            |   7 +
 pulsar-client-go/go.sum                            |  15 +
 pulsar-client-go/pulsar/c_client.go                |  74 ++-
 pulsar-client-go/pulsar/c_consumer.go              |  60 ++-
 pulsar-client-go/pulsar/c_message.go               |  11 +-
 pulsar-client-go/pulsar/c_producer.go              |  77 +++-
 pulsar-client-go/pulsar/c_reader.go                |  13 +-
 pulsar-client-go/pulsar/client.go                  |   8 +-
 pulsar-client-go/pulsar/consumer.go                |   4 +
 pulsar-client-go/pulsar/consumer_test.go           |   7 +-
 pulsar-client-go/pulsar/message.go                 |   6 +
 pulsar-client-go/pulsar/pb/build.sh                |  23 +
 pulsar-client-go/pulsar/pb/hello.pb.go             | 100 ++++
 pulsar-client-go/pulsar/pb/hello.proto             |  25 +
 pulsar-client-go/pulsar/primitiveSerDe.go          | 318 +++++++++++++
 pulsar-client-go/pulsar/primitiveSerDe_test.go     | 145 ++++++
 pulsar-client-go/pulsar/producer.go                |   2 +
 pulsar-client-go/pulsar/producer_test.go           |   3 +-
 pulsar-client-go/pulsar/reader.go                  |   2 +
 pulsar-client-go/pulsar/reader_test.go             |   2 +-
 pulsar-client-go/pulsar/schema.go                  | 504 +++++++++++++++++++++
 pulsar-client-go/pulsar/schemaDef_test.go          |  58 +++
 pulsar-client-go/pulsar/schema_test.go             | 436 ++++++++++++++++++
 .../pulsar/{util_test.go => testhelps.go}          |   0
 29 files changed, 1935 insertions(+), 60 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
index 257ee3a..fe09fb9 100644
--- a/pulsar-client-cpp/include/pulsar/Schema.h
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -40,69 +40,69 @@ enum SchemaType
     STRING = 1,
 
     /**
-     * A 8-byte integer.
+     * JSON object encoding and validation
      */
-    INT8 = 2,
+    JSON = 2,
 
     /**
-     * A 16-byte integer.
+     * Protobuf message encoding and decoding
      */
-    INT16 = 3,
+    PROTOBUF = 3,
 
     /**
-     * A 32-byte integer.
+     * Serialize and deserialize via Avro
      */
-    INT32 = 4,
+    AVRO = 4,
 
     /**
-     * A 64-byte integer.
+     * A 8-byte integer.
      */
-    INT64 = 5,
+    INT8 = 6,
 
     /**
-     * A float number.
+     * A 16-byte integer.
      */
-    FLOAT = 6,
+    INT16 = 7,
 
     /**
-     * A double number
+     * A 32-byte integer.
      */
-    DOUBLE = 7,
+    INT32 = 8,
 
     /**
-     * A bytes array.
+     * A 64-byte integer.
      */
-    BYTES = 8,
+    INT64 = 9,
 
     /**
-     * JSON object encoding and validation
+     * A float number.
      */
-    JSON = 9,
+    FLOAT = 10,
 
     /**
-     * Protobuf message encoding and decoding
+     * A double number
      */
-    PROTOBUF = 10,
+    DOUBLE = 11,
 
     /**
-     * Serialize and deserialize via Avro
+     * A Schema that contains Key Schema and Value Schema.
      */
-    AVRO = 11,
+    KEY_VALUE = 15,
 
     /**
-     * Auto Consume Type.
+     * A bytes array.
      */
-    AUTO_CONSUME = 13,
+    BYTES = -1,
 
     /**
-     * Auto Publish Type.
+     * Auto Consume Type.
      */
-    AUTO_PUBLISH = 14,
+    AUTO_CONSUME = -3,
 
     /**
-     * A Schema that contains Key Schema and Value Schema.
+     * Auto Publish Type.
      */
-    KEY_VALUE = 15,
+    AUTO_PUBLISH = -4,
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 810d062..2a04d1e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -19,6 +19,7 @@
 #pragma once
 
 #include "consumer.h"
+#include "producer_configuration.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -82,6 +83,10 @@ void pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configurati
 pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
     pulsar_consumer_configuration_t *consumer_configuration);
 
+void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
+                                                   pulsar_schema_type schemaType, const char *name,
+                                                   const char *schema, pulsar_string_map_t *properties);
+
 /**
  * A message listener enables your application to configure how to process
  * and acknowledge messages delivered. A listener will be called in order
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 670bf50..84b324b 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -43,6 +43,25 @@ typedef enum {
     pulsar_CompressionZLib = 2
 } pulsar_compression_type;
 
+typedef enum {
+    pulsar_None = 0,
+    pulsar_String = 1,
+    pulsar_Json = 2,
+    pulsar_Protobuf = 3,
+    pulsar_Avro = 4,
+    pulsar_Boolean = 5,
+    pulsar_Int8 = 6,
+    pulsar_Int16 = 7,
+    pulsar_Int32 = 8,
+    pulsar_Int64 = 9,
+    pulsar_Float32 = 10,
+    pulsar_Float64 = 11,
+    pulsar_KeyValue = 15,
+    pulsar_Bytes = -1,
+    pulsar_AutoConsume = -3,
+    pulsar_AutoPublish = -4,
+} pulsar_schema_type;
+
 typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
 
 pulsar_producer_configuration_t *pulsar_producer_configuration_create();
@@ -69,6 +88,10 @@ void pulsar_producer_configuration_set_compression_type(pulsar_producer_configur
 pulsar_compression_type pulsar_producer_configuration_get_compression_type(
     pulsar_producer_configuration_t *conf);
 
+void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
+                                                   pulsar_schema_type schemaType, const char *name,
+                                                   const char *schema, pulsar_string_map_t *properties);
+
 void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
                                                             int maxPendingMessages);
 
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 49cf2be..9b23729 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -40,6 +40,13 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
     return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
 }
 
+void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
+                                                   pulsar_schema_type schemaType, const char *name,
+                                                   const char *schema, pulsar_string_map_t *properties) {
+    auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
+    consumer_configuration->consumerConfiguration.setSchema(schemaInfo);
+}
+
 static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
                                       pulsar_message_listener listener, void *ctx) {
     pulsar_consumer_t c_consumer;
@@ -105,6 +112,7 @@ void pulsar_configure_set_negative_ack_redelivery_delay_ms(
     pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis) {
     consumer_configuration->consumerConfiguration.setNegativeAckRedeliveryDelayMs(redeliveryDelayMillis);
 }
+
 long pulsar_configure_get_negative_ack_redelivery_delay_ms(
     pulsar_consumer_configuration_t *consumer_configuration) {
     return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index a4a24c1..6e2b7fc 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -66,6 +66,13 @@ pulsar_compression_type pulsar_producer_configuration_get_compression_type(
     return (pulsar_compression_type)conf->conf.getCompressionType();
 }
 
+void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
+                                                   pulsar_schema_type schemaType, const char *name,
+                                                   const char *schema, pulsar_string_map_t *properties) {
+    auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
+    conf->conf.setSchema(schemaInfo);
+}
+
 void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
                                                             int maxPendingMessages) {
     conf->conf.setMaxPendingMessages(maxPendingMessages);
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
index 1d826eb..ea75273 100644
--- a/pulsar-client-go/go.mod
+++ b/pulsar-client-go/go.mod
@@ -2,8 +2,15 @@ module github.com/apache/pulsar/pulsar-client-go
 
 require (
 	github.com/BurntSushi/toml v0.3.1 // indirect
+	github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
+	github.com/davecgh/go-spew v1.1.1
+	github.com/gogo/protobuf v1.2.1
+	github.com/golang/protobuf v1.3.1
+	github.com/golang/snappy v0.0.1 // indirect
+	github.com/linkedin/goavro v2.1.0+incompatible
 	github.com/sirupsen/logrus v1.3.0
 	github.com/stretchr/testify v1.3.0
+	gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0
 	gopkg.in/yaml.v2 v2.2.2 // indirect
 )
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
index 5f0cd6b..b6177d8 100644
--- a/pulsar-client-go/go.sum
+++ b/pulsar-client-go/go.sum
@@ -1,10 +1,22 @@
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
@@ -18,8 +30,11 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4=
+gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
diff --git a/pulsar-client-go/pulsar/c_client.go b/pulsar-client-go/pulsar/c_client.go
index 3ade48c..33f523d 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -196,7 +196,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
 		error
 	})
 
-	client.CreateProducerAsync(options, func(producer Producer, err error) {
+	client.CreateProducerAsync(options, nil, func(producer Producer, err error) {
 		c <- struct {
 			Producer
 			error
@@ -208,8 +208,28 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
 	return res.Producer, res.error
 }
 
-func (client *client) CreateProducerAsync(options ProducerOptions, callback func(producer Producer, err error)) {
-	createProducerAsync(client, options, callback)
+func (client *client) CreateProducerWithSchema(options ProducerOptions, schema Schema) (Producer, error) {
+	// Create is implemented on async create with a channel to wait for
+	// completion without blocking the real thread
+	c := make(chan struct {
+		Producer
+		error
+	})
+
+	client.CreateProducerAsync(options, schema, func(producer Producer, err error) {
+		c <- struct {
+			Producer
+			error
+		}{producer, err}
+		close(c)
+	})
+
+	res := <-c
+	return res.Producer, res.error
+}
+
+func (client *client) CreateProducerAsync(options ProducerOptions, schema Schema, callback func(producer Producer, err error)) {
+	createProducerAsync(client, schema, options, callback)
 }
 
 func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
@@ -218,7 +238,25 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
 		error
 	})
 
-	client.SubscribeAsync(options, func(consumer Consumer, err error) {
+	client.SubscribeAsync(options, nil, func(consumer Consumer, err error) {
+		c <- struct {
+			Consumer
+			error
+		}{consumer, err}
+		close(c)
+	})
+
+	res := <-c
+	return res.Consumer, res.error
+}
+
+func (client *client) SubscribeWithSchema(options ConsumerOptions, schema Schema) (Consumer, error) {
+	c := make(chan struct {
+		Consumer
+		error
+	})
+
+	client.SubscribeAsync(options, schema, func(consumer Consumer, err error) {
 		c <- struct {
 			Consumer
 			error
@@ -230,8 +268,8 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
 	return res.Consumer, res.error
 }
 
-func (client *client) SubscribeAsync(options ConsumerOptions, callback func(Consumer, error)) {
-	subscribeAsync(client, options, callback)
+func (client *client) SubscribeAsync(options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
+	subscribeAsync(client, options, schema, callback)
 }
 
 func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
@@ -240,7 +278,25 @@ func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
 		error
 	})
 
-	client.CreateReaderAsync(options, func(reader Reader, err error) {
+	client.CreateReaderAsync(options, nil, func(reader Reader, err error) {
+		c <- struct {
+			Reader
+			error
+		}{reader, err}
+		close(c)
+	})
+
+	res := <-c
+	return res.Reader, res.error
+}
+
+func (client *client) CreateReaderWithSchema(options ReaderOptions, schema Schema) (Reader, error) {
+	c := make(chan struct {
+		Reader
+		error
+	})
+
+	client.CreateReaderAsync(options, schema, func(reader Reader, err error) {
 		c <- struct {
 			Reader
 			error
@@ -271,8 +327,8 @@ func pulsarGetTopicPartitionsCallbackProxy(res C.pulsar_result, cPartitions *C.p
 	}
 }
 
-func (client *client) CreateReaderAsync(options ReaderOptions, callback func(Reader, error)) {
-	createReaderAsync(client, options, callback)
+func (client *client) CreateReaderAsync(options ReaderOptions, schema Schema, callback func(Reader, error)) {
+	createReaderAsync(client, schema, options, callback)
 }
 
 func (client *client) TopicPartitions(topic string) ([]string, error) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 97fa843..ac6d9ed 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
 )
 
 type consumer struct {
+	schema         Schema
 	client         *client
 	ptr            *C.pulsar_consumer_t
 	defaultChannel chan ConsumerMessage
@@ -53,18 +54,20 @@ func pulsarSubscribeCallbackProxy(res C.pulsar_result, ptr *C.pulsar_consumer_t,
 		cc.callback(nil, newError(res, "Failed to subscribe to topic"))
 	} else {
 		cc.consumer.ptr = ptr
+		cc.consumer.schema = cc.schema
 		runtime.SetFinalizer(cc.consumer, consumerFinalizer)
 		cc.callback(cc.consumer, nil)
 	}
 }
 
 type subscribeContext struct {
+	schema   Schema
 	conf     *C.pulsar_consumer_configuration_t
 	consumer *consumer
 	callback func(Consumer, error)
 }
 
-func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
+func subscribeAsync(client *client, options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
 	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
 		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
 		return
@@ -109,6 +112,48 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 		C.pulsar_consumer_set_subscription_initial_position(conf, C.initial_position(options.SubscriptionInitPos))
 	}
 
+	if schema != nil && schema.GetSchemaInfo() != nil {
+		if schema.GetSchemaInfo().Type != NONE {
+			cName := C.CString(schema.GetSchemaInfo().Name)
+			cSchema := C.CString(schema.GetSchemaInfo().Schema)
+			properties := C.pulsar_string_map_create()
+			defer C.free(unsafe.Pointer(cName))
+			defer C.free(unsafe.Pointer(cSchema))
+			defer C.pulsar_string_map_free(properties)
+
+			for key, value := range schema.GetSchemaInfo().Properties {
+				cKey := C.CString(key)
+				cValue := C.CString(value)
+
+				C.pulsar_string_map_put(properties, cKey, cValue)
+
+				C.free(unsafe.Pointer(cKey))
+				C.free(unsafe.Pointer(cValue))
+			}
+			C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+				cName, cSchema, properties)
+		} else {
+			cName := C.CString("BYTES")
+			cSchema := C.CString("")
+			properties := C.pulsar_string_map_create()
+			defer C.free(unsafe.Pointer(cName))
+			defer C.free(unsafe.Pointer(cSchema))
+			defer C.pulsar_string_map_free(properties)
+
+			for key, value := range schema.GetSchemaInfo().Properties {
+				cKey := C.CString(key)
+				cValue := C.CString(value)
+
+				C.pulsar_string_map_put(properties, cKey, cValue)
+
+				C.free(unsafe.Pointer(cKey))
+				C.free(unsafe.Pointer(cValue))
+			}
+			C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
+				cName, cSchema, properties)
+		}
+	}
+
 	// ReceiverQueueSize==0 means to use the default queue size
 	// -1 means to disable the consumer prefetching
 	if options.ReceiverQueueSize > 0 {
@@ -147,7 +192,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
 	subName := C.CString(options.SubscriptionName)
 	defer C.free(unsafe.Pointer(subName))
 
-	callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
+	callbackPtr := savePointer(&subscribeContext{schema: schema, conf: conf, consumer: consumer, callback: callback})
 
 	if options.Topic != "" {
 		topic := C.CString(options.Topic)
@@ -193,12 +238,15 @@ func pulsarMessageListenerProxy(cConsumer *C.pulsar_consumer_t, message *C.pulsa
 			// There was an error when sending channel (eg: already closed)
 		}
 	}()
-
-	cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(message)}
+	cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(cc.consumer.Schema(), message)}
 }
 
 //// Consumer
 
+func (c *consumer) Schema() Schema {
+	return c.schema
+}
+
 func (c *consumer) Topic() string {
 	return C.GoString(C.pulsar_consumer_get_topic(c.ptr))
 }
@@ -210,7 +258,9 @@ func (c *consumer) Subscription() string {
 func (c *consumer) Unsubscribe() error {
 	channel := make(chan error)
 	c.UnsubscribeAsync(func(err error) {
-		channel <- err; close(channel) })
+		channel <- err
+		close(channel)
+	})
 	return <-channel
 }
 
diff --git a/pulsar-client-go/pulsar/c_message.go b/pulsar-client-go/pulsar/c_message.go
index 0dfbfa6..c45027a 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -32,7 +32,8 @@ import (
 )
 
 type message struct {
-	ptr *C.pulsar_message_t
+	ptr    *C.pulsar_message_t
+	schema Schema
 }
 
 type messageID struct {
@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {
 
 ////////////// Message
 
-func newMessageWrapper(ptr *C.pulsar_message_t) Message {
-	msg := &message{ptr: ptr}
+func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message {
+	msg := &message{schema: schema, ptr: ptr}
 	runtime.SetFinalizer(msg, messageFinalizer)
 	return msg
 }
@@ -107,6 +108,10 @@ func messageFinalizer(msg *message) {
 	C.pulsar_message_free(msg.ptr)
 }
 
+func (m *message) GetValue(v interface{}) error {
+	return m.schema.Decode(m.Payload(), v)
+}
+
 func (m *message) Properties() map[string]string {
 	cProperties := C.pulsar_message_get_properties(m.ptr)
 	defer C.pulsar_string_map_free(cProperties)
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 80fe017..528df30 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -25,6 +25,7 @@ package pulsar
 import "C"
 import (
 	"context"
+	"errors"
 	"runtime"
 	"time"
 	"unsafe"
@@ -32,6 +33,7 @@ import (
 
 type createProducerCtx struct {
 	client   *client
+	schema   Schema
 	callback func(producer Producer, err error)
 	conf     *C.pulsar_producer_configuration_t
 }
@@ -45,13 +47,13 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
 	if res != C.pulsar_result_Ok {
 		producerCtx.callback(nil, newError(res, "Failed to create Producer"))
 	} else {
-		p := &producer{client: producerCtx.client, ptr: ptr}
+		p := &producer{client: producerCtx.client, schema: producerCtx.schema, ptr: ptr}
 		runtime.SetFinalizer(p, producerFinalizer)
 		producerCtx.callback(p, nil)
 	}
 }
 
-func createProducerAsync(client *client, options ProducerOptions, callback func(producer Producer, err error)) {
+func createProducerAsync(client *client, schema Schema, options ProducerOptions, callback func(producer Producer, err error)) {
 	if options.Topic == "" {
 		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required when creating producer"))
 		return
@@ -108,6 +110,48 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 		C.pulsar_producer_configuration_set_compression_type(conf, C.pulsar_compression_type(options.CompressionType))
 	}
 
+	if schema != nil && schema.GetSchemaInfo() != nil {
+		if schema.GetSchemaInfo().Type != NONE {
+			cName := C.CString(schema.GetSchemaInfo().Name)
+			cSchema := C.CString(schema.GetSchemaInfo().Schema)
+			properties := C.pulsar_string_map_create()
+			defer C.free(unsafe.Pointer(cName))
+			defer C.free(unsafe.Pointer(cSchema))
+			defer C.pulsar_string_map_free(properties)
+
+			for key, value := range schema.GetSchemaInfo().Properties {
+				cKey := C.CString(key)
+				cValue := C.CString(value)
+
+				C.pulsar_string_map_put(properties, cKey, cValue)
+
+				C.free(unsafe.Pointer(cKey))
+				C.free(unsafe.Pointer(cValue))
+			}
+			C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+				cName, cSchema, properties)
+		} else {
+			cName := C.CString("BYTES")
+			cSchema := C.CString("")
+			properties := C.pulsar_string_map_create()
+			defer C.free(unsafe.Pointer(cName))
+			defer C.free(unsafe.Pointer(cSchema))
+			defer C.pulsar_string_map_free(properties)
+
+			for key, value := range schema.GetSchemaInfo().Properties {
+				cKey := C.CString(key)
+				cValue := C.CString(value)
+
+				C.pulsar_string_map_put(properties, cKey, cValue)
+
+				C.free(unsafe.Pointer(cKey))
+				C.free(unsafe.Pointer(cValue))
+			}
+			C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
+				cName, cSchema, properties)
+		}
+	}
+
 	if options.MessageRouter != nil {
 		C._pulsar_producer_configuration_set_message_router(conf, savePointer(&options.MessageRouter))
 	}
@@ -141,7 +185,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
 	defer C.free(unsafe.Pointer(topicName))
 
 	C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
-		savePointer(createProducerCtx{client, callback, conf}))
+		savePointer(createProducerCtx{client, schema, callback, conf}))
 }
 
 type topicMetadata struct {
@@ -155,7 +199,7 @@ func (tm *topicMetadata) NumPartitions() int {
 //export pulsarRouterCallbackProxy
 func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic_metadata_t, ctx unsafe.Pointer) C.int {
 	router := restorePointerNoDelete(ctx).(*func(msg Message, metadata TopicMetadata) int)
-	partitionIdx := (*router)(&message{msg}, &topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
+	partitionIdx := (*router)(&message{ptr: msg}, &topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
 	return C.int(partitionIdx)
 }
 
@@ -164,6 +208,7 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
 type producer struct {
 	client *client
 	ptr    *C.pulsar_producer_t
+	schema Schema
 }
 
 func producerFinalizer(p *producer) {
@@ -178,6 +223,10 @@ func (p *producer) Name() string {
 	return C.GoString(C.pulsar_producer_get_producer_name(p.ptr))
 }
 
+func (p *producer) Schema() Schema {
+	return p.schema
+}
+
 func (p *producer) LastSequenceID() int64 {
 	return int64(C.pulsar_producer_get_last_sequence_id(p.ptr))
 }
@@ -212,10 +261,27 @@ func pulsarProducerSendCallbackProxy(res C.pulsar_result, message *C.pulsar_mess
 }
 
 func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+	if p.schema != nil {
+		if msg.Value == nil {
+			callback(msg, errors.New("message value is nil, please check"))
+			return
+		}
+		payLoad, err := p.schema.Encode(msg.Value)
+		if err != nil {
+			callback(msg, errors.New("serialize message value error, please check"))
+			return
+		}
+		msg.Payload = payLoad
+	} else {
+		if msg.Value != nil {
+			callback(msg, errors.New("message value is set but no schema is provided, please check"))
+			return
+		}
+	}
 	cMsg := buildMessage(msg)
 	defer C.pulsar_message_free(cMsg)
 
-	C._pulsar_producer_send_async(p.ptr, cMsg, savePointer(sendCallback{msg, callback}))
+	C._pulsar_producer_send_async(p.ptr, cMsg, savePointer(sendCallback{message: msg, callback: callback}))
 }
 
 func (p *producer) Close() error {
@@ -252,7 +318,6 @@ func (p *producer) FlushAsync(callback func(error)) {
 	C._pulsar_producer_flush_async(p.ptr, savePointer(callback))
 }
 
-
 //export pulsarProducerFlushCallbackProxy
 func pulsarProducerFlushCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
 	callback := restorePointer(ctx).(func(error))
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 7336c1a..0abf4f0 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
 )
 
 type reader struct {
+	schema         Schema
 	client         *client
 	ptr            *C.pulsar_reader_t
 	defaultChannel chan ReaderMessage
@@ -52,18 +53,20 @@ func pulsarCreateReaderCallbackProxy(res C.pulsar_result, ptr *C.pulsar_reader_t
 		cc.callback(nil, newError(res, "Failed to create Reader"))
 	} else {
 		cc.reader.ptr = ptr
+		cc.reader.schema = cc.schema
 		runtime.SetFinalizer(cc.reader, readerFinalizer)
 		cc.callback(cc.reader, nil)
 	}
 }
 
 type readerAndCallback struct {
+	schema   Schema
 	reader   *reader
 	conf     *C.pulsar_reader_configuration_t
 	callback func(Reader, error)
 }
 
-func createReaderAsync(client *client, options ReaderOptions, callback func(Reader, error)) {
+func createReaderAsync(client *client, schema Schema, options ReaderOptions, callback func(Reader, error)) {
 	if options.Topic == "" {
 		go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
 		return
@@ -113,7 +116,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
 	defer C.free(unsafe.Pointer(topic))
 
 	C._pulsar_client_create_reader_async(client.ptr, topic, options.StartMessageID.(*messageID).ptr,
-		conf, savePointer(&readerAndCallback{reader, conf, callback}))
+		conf, savePointer(&readerAndCallback{schema: schema, reader: reader, conf: conf, callback: callback}))
 }
 
 type readerCallback struct {
@@ -132,13 +135,17 @@ func pulsarReaderListenerProxy(cReader *C.pulsar_reader_t, message *C.pulsar_mes
 		}
 	}()
 
-	rc.channel <- ReaderMessage{rc.reader, newMessageWrapper(message)}
+	rc.channel <- ReaderMessage{rc.reader, newMessageWrapper(rc.reader.Schema(), message)}
 }
 
 func (r *reader) Topic() string {
 	return C.GoString(C.pulsar_reader_get_topic(r.ptr))
 }
 
+func (r *reader) Schema() Schema {
+	return r.schema
+}
+
 func (r *reader) Next(ctx context.Context) (Message, error) {
 	select {
 	case <-ctx.Done():
diff --git a/pulsar-client-go/pulsar/client.go b/pulsar-client-go/pulsar/client.go
index f62a403..5d41229 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -30,7 +30,7 @@ func NewClient(options ClientOptions) (Client, error) {
 }
 
 // Opaque interface that represents the authentication credentials
-type Authentication interface {}
+type Authentication interface{}
 
 // Create new Authentication provider with specified auth token
 func NewAuthenticationToken(token string) Authentication {
@@ -103,16 +103,22 @@ type Client interface {
 	// This method will block until the producer is created successfully
 	CreateProducer(ProducerOptions) (Producer, error)
 
+	CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error)
+
 	// Create a `Consumer` by subscribing to a topic.
 	//
 	// If the subscription does not exist, a new subscription will be created and all messages published after the
 	// creation will be retained until acknowledged, even if the consumer is not connected
 	Subscribe(ConsumerOptions) (Consumer, error)
 
+	SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error)
+
 	// Create a Reader instance.
 	// This method will block until the reader is created successfully.
 	CreateReader(ReaderOptions) (Reader, error)
 
+	CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error)
+
 	// Fetch the list of partitions for a given topic
 	//
 	// If the topic is partitioned, this will return a list of partition names.
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index ae6b498..e2503e0 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -124,6 +124,8 @@ type ConsumerOptions struct {
 	//  failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
 	//  shared subscription, will lead to the subscription call throwing a PulsarClientException.
 	ReadCompacted bool
+
+	Schema
 }
 
 // An interface that abstracts behavior of Pulsar's consumer
@@ -198,4 +200,6 @@ type Consumer interface {
 	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
 	// breaks, the messages are redelivered after reconnect.
 	RedeliverUnackedMessages()
+
+	Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index b28341f..b34c5c9 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -83,7 +83,7 @@ func TestConsumer(t *testing.T) {
 	assert.Nil(t, err)
 	defer consumer.Close()
 
-	assert.Equal(t, consumer.Topic(), "persistent://public/default/" + topic)
+	assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
 	assert.Equal(t, consumer.Subscription(), "my-sub")
 
 	ctx := context.Background()
@@ -102,7 +102,7 @@ func TestConsumer(t *testing.T) {
 		assert.NotNil(t, msg)
 
 		assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
-		assert.Equal(t, msg.Topic(), "persistent://public/default/" + topic)
+		assert.Equal(t, msg.Topic(), "persistent://public/default/"+topic)
 		fmt.Println("Send time: ", sendTime)
 		fmt.Println("Publish time: ", msg.PublishTime())
 		fmt.Println("Receive time: ", recvTime)
@@ -405,7 +405,7 @@ func TestConsumerRegex(t *testing.T) {
 	}
 
 	for i := 0; i < 20; i++ {
-		ctx, _ = context.WithTimeout(context.Background(), 1 * time.Second)
+		ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
 		assert.NotNil(t, msg)
@@ -585,6 +585,5 @@ func TestConsumerNegativeAcks(t *testing.T) {
 		consumer.Ack(msg)
 	}
 
-
 	consumer.Unsubscribe()
 }
diff --git a/pulsar-client-go/pulsar/message.go b/pulsar-client-go/pulsar/message.go
index d20ed9a..bd2aa38 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -25,6 +25,9 @@ type ProducerMessage struct {
 	// Payload for the message
 	Payload []byte
 
+	//Value and payload is mutually exclusive, `Value interface{}` for schema message.
+	Value interface{}
+
 	// Sets the key of the message for routing policy
 	Key string
 
@@ -66,6 +69,9 @@ type Message interface {
 
 	// Get the key of the message, if any
 	Key() string
+
+	//Get the de-serialized value of the message, according the configured
+	GetValue(v interface{}) error
 }
 
 // Identifier for a particular message
diff --git a/pulsar-client-go/pulsar/pb/build.sh b/pulsar-client-go/pulsar/pb/build.sh
new file mode 100755
index 0000000..3987795
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/build.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pkg=pb
+protoc --go_out=import_path=${pkg}:. hello.proto
+
diff --git a/pulsar-client-go/pulsar/pb/hello.pb.go b/pulsar-client-go/pulsar/pb/hello.pb.go
new file mode 100644
index 0000000..7cdf086
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.pb.go
@@ -0,0 +1,100 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: hello.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Test struct {
+	Num                  int32    `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
+	Msf                  string   `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *Test) Reset()         { *m = Test{} }
+func (m *Test) String() string { return proto.CompactTextString(m) }
+func (*Test) ProtoMessage()    {}
+func (*Test) Descriptor() ([]byte, []int) {
+	return fileDescriptor_hello_38c7a10202078446, []int{0}
+}
+func (m *Test) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_Test.Unmarshal(m, b)
+}
+func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_Test.Marshal(b, m, deterministic)
+}
+func (dst *Test) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_Test.Merge(dst, src)
+}
+func (m *Test) XXX_Size() int {
+	return xxx_messageInfo_Test.Size(m)
+}
+func (m *Test) XXX_DiscardUnknown() {
+	xxx_messageInfo_Test.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Test proto.InternalMessageInfo
+
+func (m *Test) GetNum() int32 {
+	if m != nil {
+		return m.Num
+	}
+	return 0
+}
+
+func (m *Test) GetMsf() string {
+	if m != nil {
+		return m.Msf
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*Test)(nil), "prototest.Test")
+}
+
+func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_38c7a10202078446) }
+
+var fileDescriptor_hello_38c7a10202078446 = []byte{
+	// 87 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9,
+	0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x04, 0x53, 0x25, 0xa9, 0xc5, 0x25, 0x4a,
+	0x5a, 0x5c, 0x2c, 0x21, 0xa9, 0xc5, 0x25, 0x42, 0x02, 0x5c, 0xcc, 0x79, 0xa5, 0xb9, 0x12, 0x8c,
+	0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x20, 0x26, 0x48, 0x24, 0xb7, 0x38, 0x4d, 0x82, 0x49, 0x81, 0x51,
+	0x83, 0x33, 0x08, 0xc4, 0x4c, 0x62, 0x03, 0x6b, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xc5,
+	0x3d, 0x96, 0x7b, 0x4c, 0x00, 0x00, 0x00,
+}
diff --git a/pulsar-client-go/pulsar/pb/hello.proto b/pulsar-client-go/pulsar/pb/hello.proto
new file mode 100644
index 0000000..547e273
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.proto
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+package prototest;
+
+message Test {
+    int32 num = 1;
+    string msf = 2;
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe.go b/pulsar-client-go/pulsar/primitiveSerDe.go
new file mode 100644
index 0000000..767d26d
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe.go
@@ -0,0 +1,318 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io"
+	"math"
+)
+
+const (
+	IoMaxSize     = 1024
+	maxBorrowSize = 10
+)
+
+var (
+	bigEndian = binary.BigEndian
+)
+
+type BinaryFreeList chan []byte
+
+var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
+
+func (b BinaryFreeList) Borrow() (buf []byte) {
+	select {
+	case buf = <-b:
+	default:
+		buf = make([]byte, maxBorrowSize)
+
+	}
+	return buf[:maxBorrowSize]
+}
+
+func (b BinaryFreeList) Return(buf []byte) {
+	select {
+	case b <- buf:
+	default:
+	}
+}
+
+func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) {
+	buf := b.Borrow()[:1]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := buf[0]
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error) {
+	buf := b.Borrow()[:2]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint16(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error) {
+	buf := b.Borrow()[:4]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint32(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error) {
+	buf := b.Borrow()[:8]
+	if _, err := io.ReadFull(r, buf); err != nil {
+		b.Return(buf)
+		return 0, err
+	}
+	rv := byteOrder.Uint64(buf)
+	b.Return(buf)
+	return rv, nil
+}
+
+func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
+	if len(buf) < 8 {
+		return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
+	}
+	return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
+}
+
+func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
+	if len(buf) < 4 {
+		return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
+	}
+	return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
+}
+
+func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error {
+	buf := b.Borrow()[:1]
+	buf[0] = val
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error {
+	buf := b.Borrow()[:2]
+	byteOrder.PutUint16(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error {
+
+	buf := b.Borrow()[:4]
+	byteOrder.PutUint32(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error {
+	buf := b.Borrow()[:8]
+	byteOrder.PutUint64(buf, val)
+	_, err := w.Write(buf)
+	b.Return(buf)
+	return err
+}
+
+func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) {
+	var value float64
+	switch v := datum.(type) {
+	case float64:
+		value = v
+	case float32:
+		value = float64(v)
+	case int:
+		if value = float64(v); int(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+		}
+	case int64:
+		if value = float64(v); int64(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+		}
+	case int32:
+		if value = float64(v); int32(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+		}
+	default:
+		return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+	}
+	var buf []byte
+	buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0)
+	binary.BigEndian.PutUint64(buf[len(buf)-8:], math.Float64bits(value))
+	return buf, nil
+}
+
+func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) {
+	var value float32
+	switch v := datum.(type) {
+	case float32:
+		value = v
+	case float64:
+		value = float32(v)
+	case int:
+		if value = float32(v); int(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+		}
+	case int64:
+		if value = float32(v); int64(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+		}
+	case int32:
+		if value = float32(v); int32(value) != v {
+			return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+		}
+	default:
+		return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+	}
+	var buf []byte
+	buf = append(buf, 0, 0, 0, 0)
+	binary.BigEndian.PutUint32(buf[len(buf)-4:], uint32(math.Float32bits(value)))
+	return buf, nil
+}
+
+func ReadElements(r io.Reader, elements ...interface{}) error {
+	for _, element := range elements {
+		err := readElement(r, element)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func WriteElements(w io.Writer, elements ...interface{}) error {
+	for _, element := range elements {
+		err := writeElement(w, element)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func readElement(r io.Reader, element interface{}) error {
+	switch e := element.(type) {
+	case *int8:
+		rv, err := BinarySerializer.Uint8(r)
+		if err != nil {
+			return err
+		}
+		*e = int8(rv)
+		return nil
+
+	case *int16:
+		rv, err := BinarySerializer.Uint16(r, bigEndian)
+		if err != nil {
+			return err
+		}
+		*e = int16(rv)
+		return nil
+
+	case *int32:
+		rv, err := BinarySerializer.Uint32(r, bigEndian)
+		if err != nil {
+			return err
+		}
+		*e = int32(rv)
+		return nil
+
+	case *int64:
+		rv, err := BinarySerializer.Uint64(r, bigEndian)
+		if err != nil {
+			return err
+		}
+		*e = int64(rv)
+		return nil
+
+	case *bool:
+		rv, err := BinarySerializer.Uint8(r)
+		if err != nil {
+			return err
+		}
+		if rv == 0x00 {
+			*e = false
+		} else {
+			*e = true
+		}
+		return nil
+	}
+	return binary.Read(r, bigEndian, element)
+}
+
+func writeElement(w io.Writer, element interface{}) error {
+	switch e := element.(type) {
+	case int8:
+		err := BinarySerializer.PutUint8(w, uint8(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int16:
+		err := BinarySerializer.PutUint16(w, bigEndian, uint16(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int32:
+		err := BinarySerializer.PutUint32(w, bigEndian, uint32(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case int64:
+		err := BinarySerializer.PutUint64(w, bigEndian, uint64(e))
+		if err != nil {
+			return err
+		}
+		return nil
+
+	case bool:
+		var err error
+		if e {
+			err = BinarySerializer.PutUint8(w, 0x01)
+		} else {
+			err = BinarySerializer.PutUint8(w, 0x00)
+		}
+		if err != nil {
+			return err
+		}
+		return nil
+	}
+	return binary.Write(w, bigEndian, element)
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe_test.go b/pulsar-client-go/pulsar/primitiveSerDe_test.go
new file mode 100644
index 0000000..f2f0335
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe_test.go
@@ -0,0 +1,145 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"bytes"
+	"io"
+	"reflect"
+	"testing"
+
+	"github.com/davecgh/go-spew/spew"
+)
+
+func TestWriteElements(t *testing.T) {
+	tests := []struct {
+		in  interface{}
+		buf []byte
+	}{
+		{int8(1), []byte{0x01}},
+		{uint8(2), []byte{0x02}},
+		{int16(4), []byte{0x04, 0x00}},
+		{uint16(16), []byte{0x10, 0x00}},
+		{int32(1), []byte{0x01, 0x00, 0x00, 0x00}},
+		{uint32(256), []byte{0x00, 0x01, 0x00, 0x00}},
+		{
+			int64(65536),
+			[]byte{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00},
+		},
+		{
+			uint64(4294967296),
+			[]byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00},
+		},
+		{
+			true,
+			[]byte{0x01},
+		},
+		{
+			false,
+			[]byte{0x00},
+		},
+	}
+
+	t.Logf("Running %d tests", len(tests))
+	for i, test := range tests {
+		value := test
+
+		var buf bytes.Buffer
+		err := WriteElements(&buf, value.in)
+		if err != nil {
+			t.Errorf("writeElement #%d error %v", i, err)
+			continue
+		}
+		if !bytes.Equal(buf.Bytes(), test.buf) {
+			t.Error(test.in)
+			t.Errorf("writeElement #%d\n got: %s want: %s", i,
+				spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
+			continue
+		}
+
+		// Read from wire format.
+		rbuf := bytes.NewReader(test.buf)
+		val := test.in
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			val = reflect.New(reflect.TypeOf(test.in)).Interface()
+		}
+		err = ReadElements(rbuf, val)
+		if err != nil {
+			t.Errorf("readElement #%d error %v", i, err)
+			continue
+		}
+		ival := val
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			ival = reflect.Indirect(reflect.ValueOf(val)).Interface()
+		}
+		if !reflect.DeepEqual(ival, test.in) {
+			t.Errorf("readElement #%d\n got: %s want: %s", i,
+				spew.Sdump(ival), spew.Sdump(test.in))
+			continue
+		}
+	}
+}
+
+func TestElementErrors(t *testing.T) {
+	tests := []struct {
+		in       interface{}
+		max      int
+		writeErr error
+		readErr  error
+	}{
+		{int8(1), 0, nil, io.EOF},
+		{uint8(2), 0, nil, io.EOF},
+		{int16(4), 0, nil, io.EOF},
+		{uint16(16), 0, nil, io.EOF},
+		{int32(1), 0, nil, io.EOF},
+		{uint32(256), 0, nil, io.EOF},
+		{
+			int64(65536),
+			0, nil, io.EOF,
+		},
+		{
+			uint64(4294967296),
+			0, nil, io.EOF,
+		},
+		{
+			true,
+			0, nil, io.EOF,
+		},
+		{
+			false,
+			0, nil, io.EOF,
+		},
+	}
+
+	t.Logf("Running %d tests", len(tests))
+	for i, test := range tests {
+		var r bytes.Reader
+		val := test.in
+		if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+			val = reflect.New(reflect.TypeOf(test.in)).Interface()
+		}
+		err := ReadElements(&r, val)
+		if err != test.readErr {
+			t.Errorf("readElement #%d wrong error got: %v, want: %v",
+				i, err, test.readErr)
+			continue
+		}
+	}
+}
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 17aea45..5e199f7 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -184,4 +184,6 @@ type Producer interface {
 	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
 	// of errors, pending writes will not be retried.
 	Close() error
+
+	Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 633714b..df033a4 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -118,8 +118,7 @@ func TestProducerNoTopic(t *testing.T) {
 
 	defer client.Close()
 
-	producer, err := client.CreateProducer(ProducerOptions{
-	})
+	producer, err := client.CreateProducer(ProducerOptions{})
 
 	// Expect error in creating producer
 	assert.Nil(t, producer)
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index 5592630..51ade6b 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -81,4 +81,6 @@ type Reader interface {
 
 	// Close the reader and stop the broker to push more messages
 	Close() error
+
+	Schema() Schema
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index e6cb9f2..01247ce 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -75,7 +75,7 @@ func TestReader(t *testing.T) {
 	assert.Nil(t, err)
 	defer reader.Close()
 
-	assert.Equal(t, reader.Topic(), "persistent://public/default/" + topic )
+	assert.Equal(t, reader.Topic(), "persistent://public/default/"+topic)
 
 	ctx := context.Background()
 
diff --git a/pulsar-client-go/pulsar/schema.go b/pulsar-client-go/pulsar/schema.go
new file mode 100644
index 0000000..4a6f7a1
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema.go
@@ -0,0 +1,504 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"bytes"
+	"encoding/json"
+	"errors"
+	"reflect"
+	"unsafe"
+
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/linkedin/goavro"
+)
+
+type SchemaType int
+
+const (
+	NONE         SchemaType = iota //No schema defined
+	STRING                         //Simple String encoding with UTF-8
+	JSON                           //JSON object encoding and validation
+	PROTOBUF                       //Protobuf message encoding and decoding
+	AVRO                           //Serialize and deserialize via Avro
+	BOOLEAN                        //
+	INT8                           //A 8-byte integer.
+	INT16                          //A 16-byte integer.
+	INT32                          //A 32-byte integer.
+	INT64                          //A 64-byte integer.
+	FLOAT                          //A float number.
+	DOUBLE                         //A double number
+	_                              //
+	_                              //
+	_                              //
+	KEY_VALUE                      //A Schema that contains Key Schema and Value Schema.
+	BYTES        = -1              //A bytes array.
+	AUTO         = -2              //
+	AUTO_CONSUME = -3              //Auto Consume Type.
+	AUTO_PUBLISH = -4              // Auto Publish Type.
+)
+
+// Encapsulates data around the schema definition
+type SchemaInfo struct {
+	Name   string
+	Schema string
+	Type   SchemaType
+	Properties map[string]string
+}
+
+type Schema interface {
+	Encode(v interface{}) ([]byte, error)
+	Decode(data []byte, v interface{}) error
+	Validate(message []byte) error
+	GetSchemaInfo() *SchemaInfo
+}
+
+type AvroCodec struct {
+	Codec *goavro.Codec
+}
+
+func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec {
+	schemaDef := &AvroCodec{
+		Codec: schema,
+	}
+	return schemaDef
+}
+
+// initAvroCodec returns a Codec used to translate between a byte slice of either
+// binary or textual Avro data and native Go data.
+func initAvroCodec(codec string) (*goavro.Codec, error) {
+	return goavro.NewCodec(codec)
+}
+
+type JsonSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) *JsonSchema {
+	js := new(JsonSchema)
+	avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	js.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	js.SchemaInfo.Type = JSON
+	js.SchemaInfo.Properties = properties
+	js.SchemaInfo.Name = "Json"
+	return js
+}
+
+func (js *JsonSchema) Encode(data interface{}) ([]byte, error) {
+	return json.Marshal(data)
+}
+
+func (js *JsonSchema) Decode(data []byte, v interface{}) error {
+	return json.Unmarshal(data, v)
+}
+
+func (js *JsonSchema) Validate(message []byte) error {
+	return js.Decode(message, nil)
+}
+
+func (js *JsonSchema) GetSchemaInfo() *SchemaInfo {
+	return &js.SchemaInfo
+}
+
+type ProtoSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
+	ps := new(ProtoSchema)
+	avroCodec, err := initAvroCodec(protoAvroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	ps.AvroCodec.Codec = schemaDef.Codec
+	ps.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	ps.SchemaInfo.Type = PROTOBUF
+	ps.SchemaInfo.Properties = properties
+	ps.SchemaInfo.Name = "Proto"
+	return ps
+}
+
+func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
+	return proto.Marshal(data.(proto.Message))
+}
+
+func (ps *ProtoSchema) Decode(data []byte, v interface{}) error {
+	return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (ps *ProtoSchema) Validate(message []byte) error {
+	return ps.Decode(message, nil)
+}
+
+func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
+	return &ps.SchemaInfo
+}
+
+type AvroSchema struct {
+	AvroCodec
+	SchemaInfo
+}
+
+func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
+	as := new(AvroSchema)
+	avroCodec, err := initAvroCodec(avroSchemaDef)
+	if err != nil {
+		log.Fatalf("init codec error:%v", err)
+	}
+	schemaDef := NewSchemaDefinition(avroCodec)
+	as.AvroCodec.Codec = schemaDef.Codec
+	as.SchemaInfo.Schema = schemaDef.Codec.Schema()
+	as.SchemaInfo.Type = AVRO
+	as.SchemaInfo.Name = "Avro"
+	as.SchemaInfo.Properties = properties
+	return as
+}
+
+func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
+	textual, err := json.Marshal(data)
+	if err != nil {
+		log.Errorf("serialize data error:%s", err.Error())
+		return nil, err
+	}
+	native, _, err := as.Codec.NativeFromTextual(textual)
+	if err != nil {
+		log.Errorf("convert native Go form to binary Avro data error:%s", err.Error())
+		return nil, err
+	}
+	return as.Codec.BinaryFromNative(nil, native)
+}
+
+func (as *AvroSchema) Decode(data []byte, v interface{}) error {
+	native, _, err := as.Codec.NativeFromBinary(data)
+	if err != nil {
+		log.Errorf("convert binary Avro data back to native Go form error:%s", err.Error())
+		return err
+	}
+	textual, err := as.Codec.TextualFromNative(nil, native)
+	if err != nil {
+		log.Errorf("convert native Go form to textual Avro data error:%s", err.Error())
+		return err
+	}
+	err = json.Unmarshal(textual, v)
+	if err != nil {
+		log.Errorf("unSerialize textual error:%s", err.Error())
+		return err
+	}
+	return nil
+}
+
+func (as *AvroSchema) Validate(message []byte) error {
+	return as.Decode(message, nil)
+}
+
+func (as *AvroSchema) GetSchemaInfo() *SchemaInfo {
+	return &as.SchemaInfo
+}
+
+type StringSchema struct {
+	SchemaInfo
+}
+
+func NewStringSchema(properties map[string]string) *StringSchema {
+	strSchema := new(StringSchema)
+	strSchema.SchemaInfo.Properties = properties
+	strSchema.SchemaInfo.Name = "String"
+	strSchema.SchemaInfo.Type = STRING
+	strSchema.SchemaInfo.Schema = ""
+	return strSchema
+}
+
+func (ss *StringSchema) Encode(v interface{}) ([]byte, error) {
+	return []byte(v.(string)), nil
+}
+
+func (ss *StringSchema) Decode(data []byte, v interface{}) error {
+	bh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
+	sh := reflect.StringHeader{
+		Data: bh.Data,
+		Len:  bh.Len,
+	}
+	shPtr := (*string)(unsafe.Pointer(&sh))
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(shPtr))
+	return nil
+}
+
+func (ss *StringSchema) Validate(message []byte) error {
+	return ss.Decode(message, nil)
+}
+
+func (ss *StringSchema) GetSchemaInfo() *SchemaInfo {
+	return &ss.SchemaInfo
+}
+
+type BytesSchema struct {
+	SchemaInfo
+}
+
+func NewBytesSchema(properties map[string]string) *BytesSchema {
+	bytesSchema := new(BytesSchema)
+	bytesSchema.SchemaInfo.Properties = properties
+	bytesSchema.SchemaInfo.Name = "Bytes"
+	bytesSchema.SchemaInfo.Type = BYTES
+	bytesSchema.SchemaInfo.Schema = ""
+	return bytesSchema
+}
+
+func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) {
+	return data.([]byte), nil
+}
+
+func (bs *BytesSchema) Decode(data []byte, v interface{}) error {
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(data))
+	return nil
+}
+
+func (bs *BytesSchema) Validate(message []byte) error {
+	return bs.Decode(message, nil)
+}
+
+func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {
+	return &bs.SchemaInfo
+}
+
+type Int8Schema struct {
+	SchemaInfo
+}
+
+func NewInt8Schema(properties map[string]string) *Int8Schema {
+	int8Schema := new(Int8Schema)
+	int8Schema.SchemaInfo.Properties = properties
+	int8Schema.SchemaInfo.Schema = ""
+	int8Schema.SchemaInfo.Type = INT8
+	int8Schema.SchemaInfo.Name = "INT8"
+	return int8Schema
+}
+
+func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int8))
+	return buf.Bytes(), err
+}
+
+func (is8 *Int8Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is8 *Int8Schema) Validate(message []byte) error {
+	if len(message) != 1 {
+		return errors.New("size of data received by Int8Schema is not 1")
+	}
+	return nil
+}
+
+func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo {
+	return &is8.SchemaInfo
+}
+
+type Int16Schema struct {
+	SchemaInfo
+}
+
+func NewInt16Schema(properties map[string]string) *Int16Schema {
+	int16Schema := new(Int16Schema)
+	int16Schema.SchemaInfo.Properties = properties
+	int16Schema.SchemaInfo.Name = "INT16"
+	int16Schema.SchemaInfo.Type = INT16
+	int16Schema.SchemaInfo.Schema = ""
+	return int16Schema
+}
+
+func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int16))
+	return buf.Bytes(), err
+}
+
+func (is16 *Int16Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is16 *Int16Schema) Validate(message []byte) error {
+	if len(message) != 2 {
+		return errors.New("size of data received by Int16Schema is not 2")
+	}
+	return nil
+}
+
+func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo {
+	return &is16.SchemaInfo
+}
+
+type Int32Schema struct {
+	SchemaInfo
+}
+
+func NewInt32Schema(properties map[string]string) *Int32Schema {
+	int32Schema := new(Int32Schema)
+	int32Schema.SchemaInfo.Properties = properties
+	int32Schema.SchemaInfo.Schema = ""
+	int32Schema.SchemaInfo.Name = "INT32"
+	int32Schema.SchemaInfo.Type = INT32
+	return int32Schema
+}
+
+func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int32))
+	return buf.Bytes(), err
+}
+
+func (is32 *Int32Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is32 *Int32Schema) Validate(message []byte) error {
+	if len(message) != 4 {
+		return errors.New("size of data received by Int32Schema is not 4")
+	}
+	return nil
+}
+
+func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo {
+	return &is32.SchemaInfo
+}
+
+type Int64Schema struct {
+	SchemaInfo
+}
+
+func NewInt64Schema(properties map[string]string) *Int64Schema {
+	int64Schema := new(Int64Schema)
+	int64Schema.SchemaInfo.Properties = properties
+	int64Schema.SchemaInfo.Name = "INT64"
+	int64Schema.SchemaInfo.Type = INT64
+	int64Schema.SchemaInfo.Schema = ""
+	return int64Schema
+}
+
+func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) {
+	var buf bytes.Buffer
+	err := WriteElements(&buf, value.(int64))
+	return buf.Bytes(), err
+}
+
+func (is64 *Int64Schema) Decode(data []byte, v interface{}) error {
+	buf := bytes.NewReader(data)
+	return ReadElements(buf, v)
+}
+
+func (is64 *Int64Schema) Validate(message []byte) error {
+	if len(message) != 8 {
+		return errors.New("size of data received by Int64Schema is not 8")
+	}
+	return nil
+}
+
+func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo {
+	return &is64.SchemaInfo
+}
+
+type FloatSchema struct {
+	SchemaInfo
+}
+
+func NewFloatSchema(properties map[string]string) *FloatSchema {
+	floatSchema := new(FloatSchema)
+	floatSchema.SchemaInfo.Properties = properties
+	floatSchema.SchemaInfo.Type = FLOAT
+	floatSchema.SchemaInfo.Name = "FLOAT"
+	floatSchema.SchemaInfo.Schema = ""
+	return floatSchema
+}
+
+func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) {
+	return BinarySerializer.PutFloat(value)
+}
+
+func (fs *FloatSchema) Decode(data []byte, v interface{}) error {
+	floatValue, err := BinarySerializer.Float32(data)
+	if err != nil {
+		log.Errorf("unSerialize float error:%s", err.Error())
+		return err
+	}
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(floatValue))
+	return nil
+}
+
+func (fs *FloatSchema) Validate(message []byte) error {
+	if len(message) != 4 {
+		return errors.New("size of data received by FloatSchema is not 4")
+	}
+	return nil
+}
+
+func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo {
+	return &fs.SchemaInfo
+}
+
+type DoubleSchema struct {
+	SchemaInfo
+}
+
+func NewDoubleSchema(properties map[string]string) *DoubleSchema {
+	doubleSchema := new(DoubleSchema)
+	doubleSchema.SchemaInfo.Properties = properties
+	doubleSchema.SchemaInfo.Type = DOUBLE
+	doubleSchema.SchemaInfo.Name = "DOUBLE"
+	doubleSchema.SchemaInfo.Schema = ""
+	return doubleSchema
+}
+
+func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) {
+	return BinarySerializer.PutDouble(value)
+}
+
+func (ds *DoubleSchema) Decode(data []byte, v interface{}) error {
+	doubleValue, err := BinarySerializer.Float64(data)
+	if err != nil {
+		log.Errorf("unSerialize double value error:%s", err.Error())
+		return err
+	}
+	reflect.ValueOf(v).Elem().Set(reflect.ValueOf(doubleValue))
+	return nil
+}
+
+func (ds *DoubleSchema) Validate(message []byte) error {
+	if len(message) != 8 {
+		return errors.New("size of data received by DoubleSchema is not 8")
+	}
+	return nil
+}
+
+func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo {
+	return &ds.SchemaInfo
+}
diff --git a/pulsar-client-go/pulsar/schemaDef_test.go b/pulsar-client-go/pulsar/schemaDef_test.go
new file mode 100644
index 0000000..5fb6803
--- /dev/null
+++ b/pulsar-client-go/pulsar/schemaDef_test.go
@@ -0,0 +1,58 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestSchemaDef(t *testing.T) {
+	errSchemaDef := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"int64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	_, err := initAvroCodec(errSchemaDef)
+	assert.NotNil(t, err)
+
+	errSchemaDef1 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"bool\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	_, err = initAvroCodec(errSchemaDef1)
+	assert.NotNil(t, err)
+
+	errSchemaDef2 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"float32\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	_, err = initAvroCodec(errSchemaDef2)
+	assert.NotNil(t, err)
+
+	errSchemaDef3 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"float64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	_, err = initAvroCodec(errSchemaDef3)
+	assert.NotNil(t, err)
+
+	errSchemaDef4 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	_, err = initAvroCodec(errSchemaDef4)
+	assert.NotNil(t, err)
+
+	errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer$\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\":[\"null\",\"string\"],\"default\":null\"}]}"
+	_, err = initAvroCodec(errSchemaDef5)
+	assert.NotNil(t, err)
+}
diff --git a/pulsar-client-go/pulsar/schema_test.go b/pulsar-client-go/pulsar/schema_test.go
new file mode 100644
index 0000000..3c83188
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema_test.go
@@ -0,0 +1,436 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+	"context"
+	"testing"
+
+	log "github.com/apache/pulsar/pulsar-client-go/logutil"
+	"github.com/apache/pulsar/pulsar-client-go/pulsar/pb"
+	"github.com/stretchr/testify/assert"
+)
+
+type testJson struct {
+	ID   int    `json:"id"`
+	Name string `json:"name"`
+}
+
+type testAvro struct {
+	ID   int
+	Name string
+}
+
+var (
+	exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+	protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+		"\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
+)
+
+func createClient() Client {
+	// create client
+	lookupUrl := "pulsar://localhost:6650"
+	client, err := NewClient(ClientOptions{
+		URL: lookupUrl,
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	return client
+}
+
+func TestJsonSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	jsonSchema := NewJsonSchema(exampleSchemaDef, nil)
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "jsonTopic",
+	}, jsonSchema)
+	err = producer.Send(context.Background(), ProducerMessage{
+		Value: &testJson{
+			ID:   100,
+			Name: "pulsar",
+		},
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	properties := make(map[string]string)
+	properties["pulsar"]="hello"
+	jsonSchemaWithProperties := NewJsonSchema(exampleSchemaDef, properties)
+	producer1, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "jsonTopic",
+	}, jsonSchemaWithProperties)
+	err = producer1.Send(context.Background(), ProducerMessage{
+		Value: &testJson{
+			ID:   100,
+			Name: "pulsar",
+		},
+	})
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer producer1.Close()
+
+	//create consumer
+	var s testJson
+
+	consumerJS := NewJsonSchema(exampleSchemaDef, nil)
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "jsonTopic",
+		SubscriptionName: "sub-2",
+	}, consumerJS)
+	assert.Nil(t, err)
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&s)
+	assert.Nil(t, err)
+	assert.Equal(t, s.ID, 100)
+	assert.Equal(t, s.Name, "pulsar")
+
+	defer consumer.Close()
+}
+
+func TestProtoSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	// create producer
+	psProducer := NewProtoSchema(protoSchemaDef, nil)
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "proto",
+	}, psProducer)
+	if err := producer.Send(context.Background(), ProducerMessage{
+		Value: &pb.Test{
+			Num: 100,
+			Msf: "pulsar",
+		},
+	}); err != nil {
+		log.Fatal(err)
+	}
+
+	//create consumer
+	unobj := pb.Test{}
+	psConsumer := NewProtoSchema(protoSchemaDef, nil)
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "proto",
+		SubscriptionName: "sub-1",
+	}, psConsumer)
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&unobj)
+	assert.Nil(t, err)
+	assert.Equal(t, unobj.Num, int32(100))
+	assert.Equal(t, unobj.Msf, "pulsar")
+	defer consumer.Close()
+}
+
+func TestAvroSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	// create producer
+	asProducer := NewAvroSchema(exampleSchemaDef, nil)
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "avro-topic",
+	}, asProducer)
+	assert.Nil(t, err)
+	if err := producer.Send(context.Background(), ProducerMessage{
+		Value: testAvro{
+			ID:   100,
+			Name: "pulsar",
+		},
+	}); err != nil {
+		log.Fatal(err)
+	}
+
+	//create consumer
+	unobj := testAvro{}
+
+	asConsumer := NewAvroSchema(exampleSchemaDef, nil)
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "avro-topic",
+		SubscriptionName: "sub-1",
+	}, asConsumer)
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&unobj)
+	assert.Nil(t, err)
+	assert.Equal(t, unobj.ID, 100)
+	assert.Equal(t, unobj.Name, "pulsar")
+	defer consumer.Close()
+}
+
+func TestStringSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	ssProducer := NewStringSchema(nil)
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "strTopic",
+	}, ssProducer)
+	assert.Nil(t, err)
+	if err := producer.Send(context.Background(), ProducerMessage{
+		Value: "hello pulsar",
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	var res *string
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "strTopic",
+		SubscriptionName: "sub-2",
+	}, NewStringSchema(nil))
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Equal(t, *res, "hello pulsar")
+
+	defer consumer.Close()
+}
+
+func TestBytesSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	bytes := []byte{121, 110, 121, 110}
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "bytesTopic",
+	}, NewBytesSchema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: bytes,
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	var res []byte
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "bytesTopic",
+		SubscriptionName: "sub-2",
+	}, NewBytesSchema(nil))
+	assert.Nil(t, err)
+
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Equal(t, res, bytes)
+
+	defer consumer.Close()
+}
+
+func TestInt8Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "int8Topic1",
+	}, NewInt8Schema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: int8(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "int8Topic1",
+		SubscriptionName: "sub-2",
+	}, NewInt8Schema(nil))
+	assert.Nil(t, err)
+
+	var res int8
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int8(1))
+
+	defer consumer.Close()
+}
+
+func TestInt16Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "int16Topic",
+	}, NewInt16Schema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: int16(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "int16Topic",
+		SubscriptionName: "sub-2",
+	}, NewInt16Schema(nil))
+	assert.Nil(t, err)
+
+	var res int16
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int16(1))
+	defer consumer.Close()
+}
+
+func TestInt32Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "int32Topic1",
+	}, NewInt32Schema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: int32(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "int32Topic1",
+		SubscriptionName: "sub-2",
+	}, NewInt32Schema(nil))
+	assert.Nil(t, err)
+
+	var res int32
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int32(1))
+	defer consumer.Close()
+}
+
+func TestInt64Schema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "int64Topic",
+	}, NewInt64Schema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: int64(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "int64Topic",
+		SubscriptionName: "sub-2",
+	}, NewInt64Schema(nil))
+	assert.Nil(t, err)
+
+	var res int64
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, int64(1))
+	defer consumer.Close()
+}
+
+func TestFloatSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "floatTopic",
+	}, NewFloatSchema(nil))
+	assert.Nil(t, err)
+	if err := producer.Send(context.Background(), ProducerMessage{
+		Value: float32(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "floatTopic",
+		SubscriptionName: "sub-2",
+	}, NewFloatSchema(nil))
+	assert.Nil(t, err)
+
+	var res float32
+	msg, err := consumer.Receive(context.Background())
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, float32(1))
+	defer consumer.Close()
+}
+
+func TestDoubleSchema(t *testing.T) {
+	client := createClient()
+	defer client.Close()
+
+	producer, err := client.CreateTypedProducer(ProducerOptions{
+		Topic: "doubleTopic",
+	}, NewDoubleSchema(nil))
+	assert.Nil(t, err)
+	ctx := context.Background()
+	if err := producer.Send(ctx, ProducerMessage{
+		Value: float64(1),
+	}); err != nil {
+		log.Fatal(err)
+	}
+	defer producer.Close()
+
+	consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+		Topic:            "doubleTopic",
+		SubscriptionName: "sub-2",
+	}, NewDoubleSchema(nil))
+	assert.Nil(t, err)
+
+	var res float64
+	msg, err := consumer.Receive(ctx)
+	assert.Nil(t, err)
+	err = msg.GetValue(&res)
+	assert.Nil(t, err)
+	assert.Equal(t, res, float64(1))
+	defer consumer.Close()
+}
diff --git a/pulsar-client-go/pulsar/util_test.go b/pulsar-client-go/pulsar/testhelps.go
similarity index 100%
rename from pulsar-client-go/pulsar/util_test.go
rename to pulsar-client-go/pulsar/testhelps.go