You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/04/23 04:59:39 UTC
[pulsar] branch master updated: [go schema] support go schema for
pulsar-client-go (#3904)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5036ea [go schema] support go schema for pulsar-client-go (#3904)
d5036ea is described below
commit d5036eaefcba42e6c9d159ffb14bee597d13380f
Author: 冉小龙 <rx...@qq.com>
AuthorDate: Tue Apr 23 12:59:33 2019 +0800
[go schema] support go schema for pulsar-client-go (#3904)
Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
Master Issue: #3855
Motivation
support go schema for pulsar-client-go
---
pulsar-client-cpp/include/pulsar/Schema.h | 52 +--
.../include/pulsar/c/consumer_configuration.h | 5 +
.../include/pulsar/c/producer_configuration.h | 23 +
pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 8 +
pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 7 +
pulsar-client-go/go.mod | 7 +
pulsar-client-go/go.sum | 15 +
pulsar-client-go/pulsar/c_client.go | 74 ++-
pulsar-client-go/pulsar/c_consumer.go | 60 ++-
pulsar-client-go/pulsar/c_message.go | 11 +-
pulsar-client-go/pulsar/c_producer.go | 77 +++-
pulsar-client-go/pulsar/c_reader.go | 13 +-
pulsar-client-go/pulsar/client.go | 8 +-
pulsar-client-go/pulsar/consumer.go | 4 +
pulsar-client-go/pulsar/consumer_test.go | 7 +-
pulsar-client-go/pulsar/message.go | 6 +
pulsar-client-go/pulsar/pb/build.sh | 23 +
pulsar-client-go/pulsar/pb/hello.pb.go | 100 ++++
pulsar-client-go/pulsar/pb/hello.proto | 25 +
pulsar-client-go/pulsar/primitiveSerDe.go | 318 +++++++++++++
pulsar-client-go/pulsar/primitiveSerDe_test.go | 145 ++++++
pulsar-client-go/pulsar/producer.go | 2 +
pulsar-client-go/pulsar/producer_test.go | 3 +-
pulsar-client-go/pulsar/reader.go | 2 +
pulsar-client-go/pulsar/reader_test.go | 2 +-
pulsar-client-go/pulsar/schema.go | 504 +++++++++++++++++++++
pulsar-client-go/pulsar/schemaDef_test.go | 58 +++
pulsar-client-go/pulsar/schema_test.go | 436 ++++++++++++++++++
.../pulsar/{util_test.go => testhelps.go} | 0
29 files changed, 1935 insertions(+), 60 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
index 257ee3a..fe09fb9 100644
--- a/pulsar-client-cpp/include/pulsar/Schema.h
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -40,69 +40,69 @@ enum SchemaType
STRING = 1,
/**
- * A 8-byte integer.
+ * JSON object encoding and validation
*/
- INT8 = 2,
+ JSON = 2,
/**
- * A 16-byte integer.
+ * Protobuf message encoding and decoding
*/
- INT16 = 3,
+ PROTOBUF = 3,
/**
- * A 32-byte integer.
+ * Serialize and deserialize via Avro
*/
- INT32 = 4,
+ AVRO = 4,
/**
- * A 64-byte integer.
+ * A 8-byte integer.
*/
- INT64 = 5,
+ INT8 = 6,
/**
- * A float number.
+ * A 16-byte integer.
*/
- FLOAT = 6,
+ INT16 = 7,
/**
- * A double number
+ * A 32-byte integer.
*/
- DOUBLE = 7,
+ INT32 = 8,
/**
- * A bytes array.
+ * A 64-byte integer.
*/
- BYTES = 8,
+ INT64 = 9,
/**
- * JSON object encoding and validation
+ * A float number.
*/
- JSON = 9,
+ FLOAT = 10,
/**
- * Protobuf message encoding and decoding
+ * A double number
*/
- PROTOBUF = 10,
+ DOUBLE = 11,
/**
- * Serialize and deserialize via Avro
+ * A Schema that contains Key Schema and Value Schema.
*/
- AVRO = 11,
+ KEY_VALUE = 15,
/**
- * Auto Consume Type.
+ * A bytes array.
*/
- AUTO_CONSUME = 13,
+ BYTES = -1,
/**
- * Auto Publish Type.
+ * Auto Consume Type.
*/
- AUTO_PUBLISH = 14,
+ AUTO_CONSUME = -3,
/**
- * A Schema that contains Key Schema and Value Schema.
+ * Auto Publish Type.
*/
- KEY_VALUE = 15,
+ AUTO_PUBLISH = -4,
};
// Return string representation of result code
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index 810d062..2a04d1e 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -19,6 +19,7 @@
#pragma once
#include "consumer.h"
+#include "producer_configuration.h"
#ifdef __cplusplus
extern "C" {
@@ -82,6 +83,10 @@ void pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configurati
pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
pulsar_consumer_configuration_t *consumer_configuration);
+void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
+ pulsar_schema_type schemaType, const char *name,
+ const char *schema, pulsar_string_map_t *properties);
+
/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 670bf50..84b324b 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -43,6 +43,25 @@ typedef enum {
pulsar_CompressionZLib = 2
} pulsar_compression_type;
+typedef enum {
+ pulsar_None = 0,
+ pulsar_String = 1,
+ pulsar_Json = 2,
+ pulsar_Protobuf = 3,
+ pulsar_Avro = 4,
+ pulsar_Boolean = 5,
+ pulsar_Int8 = 6,
+ pulsar_Int16 = 7,
+ pulsar_Int32 = 8,
+ pulsar_Int64 = 9,
+ pulsar_Float32 = 10,
+ pulsar_Float64 = 11,
+ pulsar_KeyValue = 15,
+ pulsar_Bytes = -1,
+ pulsar_AutoConsume = -3,
+ pulsar_AutoPublish = -4,
+} pulsar_schema_type;
+
typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;
pulsar_producer_configuration_t *pulsar_producer_configuration_create();
@@ -69,6 +88,10 @@ void pulsar_producer_configuration_set_compression_type(pulsar_producer_configur
pulsar_compression_type pulsar_producer_configuration_get_compression_type(
pulsar_producer_configuration_t *conf);
+void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
+ pulsar_schema_type schemaType, const char *name,
+ const char *schema, pulsar_string_map_t *properties);
+
void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
int maxPendingMessages);
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 49cf2be..9b23729 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -40,6 +40,13 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
}
+void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
+ pulsar_schema_type schemaType, const char *name,
+ const char *schema, pulsar_string_map_t *properties) {
+ auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
+ consumer_configuration->consumerConfiguration.setSchema(schemaInfo);
+}
+
static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
pulsar_message_listener listener, void *ctx) {
pulsar_consumer_t c_consumer;
@@ -105,6 +112,7 @@ void pulsar_configure_set_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis) {
consumer_configuration->consumerConfiguration.setNegativeAckRedeliveryDelayMs(redeliveryDelayMillis);
}
+
long pulsar_configure_get_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index a4a24c1..6e2b7fc 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -66,6 +66,13 @@ pulsar_compression_type pulsar_producer_configuration_get_compression_type(
return (pulsar_compression_type)conf->conf.getCompressionType();
}
+void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
+ pulsar_schema_type schemaType, const char *name,
+ const char *schema, pulsar_string_map_t *properties) {
+ auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
+ conf->conf.setSchema(schemaInfo);
+}
+
void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
int maxPendingMessages) {
conf->conf.setMaxPendingMessages(maxPendingMessages);
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
index 1d826eb..ea75273 100644
--- a/pulsar-client-go/go.mod
+++ b/pulsar-client-go/go.mod
@@ -2,8 +2,15 @@ module github.com/apache/pulsar/pulsar-client-go
require (
github.com/BurntSushi/toml v0.3.1 // indirect
+ github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
+ github.com/davecgh/go-spew v1.1.1
+ github.com/gogo/protobuf v1.2.1
+ github.com/golang/protobuf v1.3.1
+ github.com/golang/snappy v0.0.1 // indirect
+ github.com/linkedin/goavro v2.1.0+incompatible
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
+ gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.2 // indirect
)
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
index 5f0cd6b..b6177d8 100644
--- a/pulsar-client-go/go.sum
+++ b/pulsar-client-go/go.sum
@@ -1,10 +1,22 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
+github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
+github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
+github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
+github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
@@ -18,8 +30,11 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4=
+gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
diff --git a/pulsar-client-go/pulsar/c_client.go b/pulsar-client-go/pulsar/c_client.go
index 3ade48c..33f523d 100644
--- a/pulsar-client-go/pulsar/c_client.go
+++ b/pulsar-client-go/pulsar/c_client.go
@@ -196,7 +196,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
error
})
- client.CreateProducerAsync(options, func(producer Producer, err error) {
+ client.CreateProducerAsync(options, nil, func(producer Producer, err error) {
c <- struct {
Producer
error
@@ -208,8 +208,28 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
return res.Producer, res.error
}
-func (client *client) CreateProducerAsync(options ProducerOptions, callback func(producer Producer, err error)) {
- createProducerAsync(client, options, callback)
+func (client *client) CreateProducerWithSchema(options ProducerOptions, schema Schema) (Producer, error) {
+ // Create is implemented on async create with a channel to wait for
+ // completion without blocking the real thread
+ c := make(chan struct {
+ Producer
+ error
+ })
+
+ client.CreateProducerAsync(options, schema, func(producer Producer, err error) {
+ c <- struct {
+ Producer
+ error
+ }{producer, err}
+ close(c)
+ })
+
+ res := <-c
+ return res.Producer, res.error
+}
+
+func (client *client) CreateProducerAsync(options ProducerOptions, schema Schema, callback func(producer Producer, err error)) {
+ createProducerAsync(client, schema, options, callback)
}
func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
@@ -218,7 +238,25 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
error
})
- client.SubscribeAsync(options, func(consumer Consumer, err error) {
+ client.SubscribeAsync(options, nil, func(consumer Consumer, err error) {
+ c <- struct {
+ Consumer
+ error
+ }{consumer, err}
+ close(c)
+ })
+
+ res := <-c
+ return res.Consumer, res.error
+}
+
+func (client *client) SubscribeWithSchema(options ConsumerOptions, schema Schema) (Consumer, error) {
+ c := make(chan struct {
+ Consumer
+ error
+ })
+
+ client.SubscribeAsync(options, schema, func(consumer Consumer, err error) {
c <- struct {
Consumer
error
@@ -230,8 +268,8 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return res.Consumer, res.error
}
-func (client *client) SubscribeAsync(options ConsumerOptions, callback func(Consumer, error)) {
- subscribeAsync(client, options, callback)
+func (client *client) SubscribeAsync(options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
+ subscribeAsync(client, options, schema, callback)
}
func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
@@ -240,7 +278,25 @@ func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
error
})
- client.CreateReaderAsync(options, func(reader Reader, err error) {
+ client.CreateReaderAsync(options, nil, func(reader Reader, err error) {
+ c <- struct {
+ Reader
+ error
+ }{reader, err}
+ close(c)
+ })
+
+ res := <-c
+ return res.Reader, res.error
+}
+
+func (client *client) CreateReaderWithSchema(options ReaderOptions, schema Schema) (Reader, error) {
+ c := make(chan struct {
+ Reader
+ error
+ })
+
+ client.CreateReaderAsync(options, schema, func(reader Reader, err error) {
c <- struct {
Reader
error
@@ -271,8 +327,8 @@ func pulsarGetTopicPartitionsCallbackProxy(res C.pulsar_result, cPartitions *C.p
}
}
-func (client *client) CreateReaderAsync(options ReaderOptions, callback func(Reader, error)) {
- createReaderAsync(client, options, callback)
+func (client *client) CreateReaderAsync(options ReaderOptions, schema Schema, callback func(Reader, error)) {
+ createReaderAsync(client, schema, options, callback)
}
func (client *client) TopicPartitions(topic string) ([]string, error) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go
index 97fa843..ac6d9ed 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -32,6 +32,7 @@ import (
)
type consumer struct {
+ schema Schema
client *client
ptr *C.pulsar_consumer_t
defaultChannel chan ConsumerMessage
@@ -53,18 +54,20 @@ func pulsarSubscribeCallbackProxy(res C.pulsar_result, ptr *C.pulsar_consumer_t,
cc.callback(nil, newError(res, "Failed to subscribe to topic"))
} else {
cc.consumer.ptr = ptr
+ cc.consumer.schema = cc.schema
runtime.SetFinalizer(cc.consumer, consumerFinalizer)
cc.callback(cc.consumer, nil)
}
}
type subscribeContext struct {
+ schema Schema
conf *C.pulsar_consumer_configuration_t
consumer *consumer
callback func(Consumer, error)
}
-func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
+func subscribeAsync(client *client, options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
return
@@ -109,6 +112,48 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_subscription_initial_position(conf, C.initial_position(options.SubscriptionInitPos))
}
+ if schema != nil && schema.GetSchemaInfo() != nil {
+ if schema.GetSchemaInfo().Type != NONE {
+ cName := C.CString(schema.GetSchemaInfo().Name)
+ cSchema := C.CString(schema.GetSchemaInfo().Schema)
+ properties := C.pulsar_string_map_create()
+ defer C.free(unsafe.Pointer(cName))
+ defer C.free(unsafe.Pointer(cSchema))
+ defer C.pulsar_string_map_free(properties)
+
+ for key, value := range schema.GetSchemaInfo().Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_string_map_put(properties, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+ cName, cSchema, properties)
+ } else {
+ cName := C.CString("BYTES")
+ cSchema := C.CString("")
+ properties := C.pulsar_string_map_create()
+ defer C.free(unsafe.Pointer(cName))
+ defer C.free(unsafe.Pointer(cSchema))
+ defer C.pulsar_string_map_free(properties)
+
+ for key, value := range schema.GetSchemaInfo().Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_string_map_put(properties, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ C.pulsar_consumer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
+ cName, cSchema, properties)
+ }
+ }
+
// ReceiverQueueSize==0 means to use the default queue size
// -1 means to disable the consumer prefetching
if options.ReceiverQueueSize > 0 {
@@ -147,7 +192,7 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
subName := C.CString(options.SubscriptionName)
defer C.free(unsafe.Pointer(subName))
- callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})
+ callbackPtr := savePointer(&subscribeContext{schema: schema, conf: conf, consumer: consumer, callback: callback})
if options.Topic != "" {
topic := C.CString(options.Topic)
@@ -193,12 +238,15 @@ func pulsarMessageListenerProxy(cConsumer *C.pulsar_consumer_t, message *C.pulsa
// There was an error when sending channel (eg: already closed)
}
}()
-
- cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(message)}
+ cc.channel <- ConsumerMessage{cc.consumer, newMessageWrapper(cc.consumer.Schema(), message)}
}
//// Consumer
+func (c *consumer) Schema() Schema {
+ return c.schema
+}
+
func (c *consumer) Topic() string {
return C.GoString(C.pulsar_consumer_get_topic(c.ptr))
}
@@ -210,7 +258,9 @@ func (c *consumer) Subscription() string {
func (c *consumer) Unsubscribe() error {
channel := make(chan error)
c.UnsubscribeAsync(func(err error) {
- channel <- err; close(channel) })
+ channel <- err
+ close(channel)
+ })
return <-channel
}
diff --git a/pulsar-client-go/pulsar/c_message.go b/pulsar-client-go/pulsar/c_message.go
index 0dfbfa6..c45027a 100644
--- a/pulsar-client-go/pulsar/c_message.go
+++ b/pulsar-client-go/pulsar/c_message.go
@@ -32,7 +32,8 @@ import (
)
type message struct {
- ptr *C.pulsar_message_t
+ ptr *C.pulsar_message_t
+ schema Schema
}
type messageID struct {
@@ -97,8 +98,8 @@ func buildMessage(message ProducerMessage) *C.pulsar_message_t {
////////////// Message
-func newMessageWrapper(ptr *C.pulsar_message_t) Message {
- msg := &message{ptr: ptr}
+func newMessageWrapper(schema Schema, ptr *C.pulsar_message_t) Message {
+ msg := &message{schema: schema, ptr: ptr}
runtime.SetFinalizer(msg, messageFinalizer)
return msg
}
@@ -107,6 +108,10 @@ func messageFinalizer(msg *message) {
C.pulsar_message_free(msg.ptr)
}
+func (m *message) GetValue(v interface{}) error {
+ return m.schema.Decode(m.Payload(), v)
+}
+
func (m *message) Properties() map[string]string {
cProperties := C.pulsar_message_get_properties(m.ptr)
defer C.pulsar_string_map_free(cProperties)
diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go
index 80fe017..528df30 100644
--- a/pulsar-client-go/pulsar/c_producer.go
+++ b/pulsar-client-go/pulsar/c_producer.go
@@ -25,6 +25,7 @@ package pulsar
import "C"
import (
"context"
+ "errors"
"runtime"
"time"
"unsafe"
@@ -32,6 +33,7 @@ import (
type createProducerCtx struct {
client *client
+ schema Schema
callback func(producer Producer, err error)
conf *C.pulsar_producer_configuration_t
}
@@ -45,13 +47,13 @@ func pulsarCreateProducerCallbackProxy(res C.pulsar_result, ptr *C.pulsar_produc
if res != C.pulsar_result_Ok {
producerCtx.callback(nil, newError(res, "Failed to create Producer"))
} else {
- p := &producer{client: producerCtx.client, ptr: ptr}
+ p := &producer{client: producerCtx.client, schema: producerCtx.schema, ptr: ptr}
runtime.SetFinalizer(p, producerFinalizer)
producerCtx.callback(p, nil)
}
}
-func createProducerAsync(client *client, options ProducerOptions, callback func(producer Producer, err error)) {
+func createProducerAsync(client *client, schema Schema, options ProducerOptions, callback func(producer Producer, err error)) {
if options.Topic == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required when creating producer"))
return
@@ -108,6 +110,48 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
C.pulsar_producer_configuration_set_compression_type(conf, C.pulsar_compression_type(options.CompressionType))
}
+ if schema != nil && schema.GetSchemaInfo() != nil {
+ if schema.GetSchemaInfo().Type != NONE {
+ cName := C.CString(schema.GetSchemaInfo().Name)
+ cSchema := C.CString(schema.GetSchemaInfo().Schema)
+ properties := C.pulsar_string_map_create()
+ defer C.free(unsafe.Pointer(cName))
+ defer C.free(unsafe.Pointer(cSchema))
+ defer C.pulsar_string_map_free(properties)
+
+ for key, value := range schema.GetSchemaInfo().Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_string_map_put(properties, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(schema.GetSchemaInfo().Type),
+ cName, cSchema, properties)
+ } else {
+ cName := C.CString("BYTES")
+ cSchema := C.CString("")
+ properties := C.pulsar_string_map_create()
+ defer C.free(unsafe.Pointer(cName))
+ defer C.free(unsafe.Pointer(cSchema))
+ defer C.pulsar_string_map_free(properties)
+
+ for key, value := range schema.GetSchemaInfo().Properties {
+ cKey := C.CString(key)
+ cValue := C.CString(value)
+
+ C.pulsar_string_map_put(properties, cKey, cValue)
+
+ C.free(unsafe.Pointer(cKey))
+ C.free(unsafe.Pointer(cValue))
+ }
+ C.pulsar_producer_configuration_set_schema_info(conf, C.pulsar_schema_type(BYTES),
+ cName, cSchema, properties)
+ }
+ }
+
if options.MessageRouter != nil {
C._pulsar_producer_configuration_set_message_router(conf, savePointer(&options.MessageRouter))
}
@@ -141,7 +185,7 @@ func createProducerAsync(client *client, options ProducerOptions, callback func(
defer C.free(unsafe.Pointer(topicName))
C._pulsar_client_create_producer_async(client.ptr, topicName, conf,
- savePointer(createProducerCtx{client, callback, conf}))
+ savePointer(createProducerCtx{client, schema, callback, conf}))
}
type topicMetadata struct {
@@ -155,7 +199,7 @@ func (tm *topicMetadata) NumPartitions() int {
//export pulsarRouterCallbackProxy
func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic_metadata_t, ctx unsafe.Pointer) C.int {
router := restorePointerNoDelete(ctx).(*func(msg Message, metadata TopicMetadata) int)
- partitionIdx := (*router)(&message{msg}, &topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
+ partitionIdx := (*router)(&message{ptr: msg}, &topicMetadata{int(C.pulsar_topic_metadata_get_num_partitions(metadata))})
return C.int(partitionIdx)
}
@@ -164,6 +208,7 @@ func pulsarRouterCallbackProxy(msg *C.pulsar_message_t, metadata *C.pulsar_topic
type producer struct {
client *client
ptr *C.pulsar_producer_t
+ schema Schema
}
func producerFinalizer(p *producer) {
@@ -178,6 +223,10 @@ func (p *producer) Name() string {
return C.GoString(C.pulsar_producer_get_producer_name(p.ptr))
}
+func (p *producer) Schema() Schema {
+ return p.schema
+}
+
func (p *producer) LastSequenceID() int64 {
return int64(C.pulsar_producer_get_last_sequence_id(p.ptr))
}
@@ -212,10 +261,27 @@ func pulsarProducerSendCallbackProxy(res C.pulsar_result, message *C.pulsar_mess
}
func (p *producer) SendAsync(ctx context.Context, msg ProducerMessage, callback func(ProducerMessage, error)) {
+ if p.schema != nil {
+ if msg.Value == nil {
+ callback(msg, errors.New("message value is nil, please check"))
+ return
+ }
+ payLoad, err := p.schema.Encode(msg.Value)
+ if err != nil {
+ callback(msg, errors.New("serialize message value error, please check"))
+ return
+ }
+ msg.Payload = payLoad
+ } else {
+ if msg.Value != nil {
+ callback(msg, errors.New("message value is set but no schema is provided, please check"))
+ return
+ }
+ }
cMsg := buildMessage(msg)
defer C.pulsar_message_free(cMsg)
- C._pulsar_producer_send_async(p.ptr, cMsg, savePointer(sendCallback{msg, callback}))
+ C._pulsar_producer_send_async(p.ptr, cMsg, savePointer(sendCallback{message: msg, callback: callback}))
}
func (p *producer) Close() error {
@@ -252,7 +318,6 @@ func (p *producer) FlushAsync(callback func(error)) {
C._pulsar_producer_flush_async(p.ptr, savePointer(callback))
}
-
//export pulsarProducerFlushCallbackProxy
func pulsarProducerFlushCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(error))
diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go
index 7336c1a..0abf4f0 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -31,6 +31,7 @@ import (
)
type reader struct {
+ schema Schema
client *client
ptr *C.pulsar_reader_t
defaultChannel chan ReaderMessage
@@ -52,18 +53,20 @@ func pulsarCreateReaderCallbackProxy(res C.pulsar_result, ptr *C.pulsar_reader_t
cc.callback(nil, newError(res, "Failed to create Reader"))
} else {
cc.reader.ptr = ptr
+ cc.reader.schema = cc.schema
runtime.SetFinalizer(cc.reader, readerFinalizer)
cc.callback(cc.reader, nil)
}
}
type readerAndCallback struct {
+ schema Schema
reader *reader
conf *C.pulsar_reader_configuration_t
callback func(Reader, error)
}
-func createReaderAsync(client *client, options ReaderOptions, callback func(Reader, error)) {
+func createReaderAsync(client *client, schema Schema, options ReaderOptions, callback func(Reader, error)) {
if options.Topic == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
return
@@ -113,7 +116,7 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read
defer C.free(unsafe.Pointer(topic))
C._pulsar_client_create_reader_async(client.ptr, topic, options.StartMessageID.(*messageID).ptr,
- conf, savePointer(&readerAndCallback{reader, conf, callback}))
+ conf, savePointer(&readerAndCallback{schema: schema, reader: reader, conf: conf, callback: callback}))
}
type readerCallback struct {
@@ -132,13 +135,17 @@ func pulsarReaderListenerProxy(cReader *C.pulsar_reader_t, message *C.pulsar_mes
}
}()
- rc.channel <- ReaderMessage{rc.reader, newMessageWrapper(message)}
+ rc.channel <- ReaderMessage{rc.reader, newMessageWrapper(rc.reader.Schema(), message)}
}
func (r *reader) Topic() string {
return C.GoString(C.pulsar_reader_get_topic(r.ptr))
}
+func (r *reader) Schema() Schema {
+ return r.schema
+}
+
func (r *reader) Next(ctx context.Context) (Message, error) {
select {
case <-ctx.Done():
diff --git a/pulsar-client-go/pulsar/client.go b/pulsar-client-go/pulsar/client.go
index f62a403..5d41229 100644
--- a/pulsar-client-go/pulsar/client.go
+++ b/pulsar-client-go/pulsar/client.go
@@ -30,7 +30,7 @@ func NewClient(options ClientOptions) (Client, error) {
}
// Opaque interface that represents the authentication credentials
-type Authentication interface {}
+type Authentication interface{}
// Create new Authentication provider with specified auth token
func NewAuthenticationToken(token string) Authentication {
@@ -103,16 +103,22 @@ type Client interface {
// This method will block until the producer is created successfully
CreateProducer(ProducerOptions) (Producer, error)
+ CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error)
+
// Create a `Consumer` by subscribing to a topic.
//
// If the subscription does not exist, a new subscription will be created and all messages published after the
// creation will be retained until acknowledged, even if the consumer is not connected
Subscribe(ConsumerOptions) (Consumer, error)
+ SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error)
+
// Create a Reader instance.
// This method will block until the reader is created successfully.
CreateReader(ReaderOptions) (Reader, error)
+ CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error)
+
// Fetch the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go
index ae6b498..e2503e0 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -124,6 +124,8 @@ type ConsumerOptions struct {
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool
+
+ Schema
}
// An interface that abstracts behavior of Pulsar's consumer
@@ -198,4 +200,6 @@ type Consumer interface {
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
RedeliverUnackedMessages()
+
+ Schema() Schema
}
diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go
index b28341f..b34c5c9 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -83,7 +83,7 @@ func TestConsumer(t *testing.T) {
assert.Nil(t, err)
defer consumer.Close()
- assert.Equal(t, consumer.Topic(), "persistent://public/default/" + topic)
+ assert.Equal(t, consumer.Topic(), "persistent://public/default/"+topic)
assert.Equal(t, consumer.Subscription(), "my-sub")
ctx := context.Background()
@@ -102,7 +102,7 @@ func TestConsumer(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))
- assert.Equal(t, msg.Topic(), "persistent://public/default/" + topic)
+ assert.Equal(t, msg.Topic(), "persistent://public/default/"+topic)
fmt.Println("Send time: ", sendTime)
fmt.Println("Publish time: ", msg.PublishTime())
fmt.Println("Receive time: ", recvTime)
@@ -405,7 +405,7 @@ func TestConsumerRegex(t *testing.T) {
}
for i := 0; i < 20; i++ {
- ctx, _ = context.WithTimeout(context.Background(), 1 * time.Second)
+ ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)
@@ -585,6 +585,5 @@ func TestConsumerNegativeAcks(t *testing.T) {
consumer.Ack(msg)
}
-
consumer.Unsubscribe()
}
diff --git a/pulsar-client-go/pulsar/message.go b/pulsar-client-go/pulsar/message.go
index d20ed9a..bd2aa38 100644
--- a/pulsar-client-go/pulsar/message.go
+++ b/pulsar-client-go/pulsar/message.go
@@ -25,6 +25,9 @@ type ProducerMessage struct {
// Payload for the message
Payload []byte
+ //Value and payload is mutually exclusive, `Value interface{}` for schema message.
+ Value interface{}
+
// Sets the key of the message for routing policy
Key string
@@ -66,6 +69,9 @@ type Message interface {
// Get the key of the message, if any
Key() string
+
+ //Get the de-serialized value of the message, according the configured
+ GetValue(v interface{}) error
}
// Identifier for a particular message
diff --git a/pulsar-client-go/pulsar/pb/build.sh b/pulsar-client-go/pulsar/pb/build.sh
new file mode 100755
index 0000000..3987795
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/build.sh
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+pkg=pb
+protoc --go_out=import_path=${pkg}:. hello.proto
+
diff --git a/pulsar-client-go/pulsar/pb/hello.pb.go b/pulsar-client-go/pulsar/pb/hello.pb.go
new file mode 100644
index 0000000..7cdf086
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.pb.go
@@ -0,0 +1,100 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: hello.proto
+
+package pb
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Test struct {
+ Num int32 `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
+ Msf string `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Test) Reset() { *m = Test{} }
+func (m *Test) String() string { return proto.CompactTextString(m) }
+func (*Test) ProtoMessage() {}
+func (*Test) Descriptor() ([]byte, []int) {
+ return fileDescriptor_hello_38c7a10202078446, []int{0}
+}
+func (m *Test) XXX_Unmarshal(b []byte) error {
+ return xxx_messageInfo_Test.Unmarshal(m, b)
+}
+func (m *Test) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ return xxx_messageInfo_Test.Marshal(b, m, deterministic)
+}
+func (dst *Test) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Test.Merge(dst, src)
+}
+func (m *Test) XXX_Size() int {
+ return xxx_messageInfo_Test.Size(m)
+}
+func (m *Test) XXX_DiscardUnknown() {
+ xxx_messageInfo_Test.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Test proto.InternalMessageInfo
+
+func (m *Test) GetNum() int32 {
+ if m != nil {
+ return m.Num
+ }
+ return 0
+}
+
+func (m *Test) GetMsf() string {
+ if m != nil {
+ return m.Msf
+ }
+ return ""
+}
+
+func init() {
+ proto.RegisterType((*Test)(nil), "prototest.Test")
+}
+
+func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_38c7a10202078446) }
+
+var fileDescriptor_hello_38c7a10202078446 = []byte{
+ // 87 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9,
+ 0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x04, 0x53, 0x25, 0xa9, 0xc5, 0x25, 0x4a,
+ 0x5a, 0x5c, 0x2c, 0x21, 0xa9, 0xc5, 0x25, 0x42, 0x02, 0x5c, 0xcc, 0x79, 0xa5, 0xb9, 0x12, 0x8c,
+ 0x0a, 0x8c, 0x1a, 0xac, 0x41, 0x20, 0x26, 0x48, 0x24, 0xb7, 0x38, 0x4d, 0x82, 0x49, 0x81, 0x51,
+ 0x83, 0x33, 0x08, 0xc4, 0x4c, 0x62, 0x03, 0x6b, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xc5,
+ 0x3d, 0x96, 0x7b, 0x4c, 0x00, 0x00, 0x00,
+}
diff --git a/pulsar-client-go/pulsar/pb/hello.proto b/pulsar-client-go/pulsar/pb/hello.proto
new file mode 100644
index 0000000..547e273
--- /dev/null
+++ b/pulsar-client-go/pulsar/pb/hello.proto
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto3";
+package prototest;
+
+message Test {
+ int32 num = 1;
+ string msf = 2;
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe.go b/pulsar-client-go/pulsar/primitiveSerDe.go
new file mode 100644
index 0000000..767d26d
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe.go
@@ -0,0 +1,318 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "math"
+)
+
+const (
+ IoMaxSize = 1024
+ maxBorrowSize = 10
+)
+
+var (
+ bigEndian = binary.BigEndian
+)
+
+type BinaryFreeList chan []byte
+
+var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
+
+func (b BinaryFreeList) Borrow() (buf []byte) {
+ select {
+ case buf = <-b:
+ default:
+ buf = make([]byte, maxBorrowSize)
+
+ }
+ return buf[:maxBorrowSize]
+}
+
+func (b BinaryFreeList) Return(buf []byte) {
+ select {
+ case b <- buf:
+ default:
+ }
+}
+
+func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error) {
+ buf := b.Borrow()[:1]
+ if _, err := io.ReadFull(r, buf); err != nil {
+ b.Return(buf)
+ return 0, err
+ }
+ rv := buf[0]
+ b.Return(buf)
+ return rv, nil
+}
+
+func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error) {
+ buf := b.Borrow()[:2]
+ if _, err := io.ReadFull(r, buf); err != nil {
+ b.Return(buf)
+ return 0, err
+ }
+ rv := byteOrder.Uint16(buf)
+ b.Return(buf)
+ return rv, nil
+}
+
+func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error) {
+ buf := b.Borrow()[:4]
+ if _, err := io.ReadFull(r, buf); err != nil {
+ b.Return(buf)
+ return 0, err
+ }
+ rv := byteOrder.Uint32(buf)
+ b.Return(buf)
+ return rv, nil
+}
+
+func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error) {
+ buf := b.Borrow()[:8]
+ if _, err := io.ReadFull(r, buf); err != nil {
+ b.Return(buf)
+ return 0, err
+ }
+ rv := byteOrder.Uint64(buf)
+ b.Return(buf)
+ return rv, nil
+}
+
+func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
+ if len(buf) < 8 {
+ return 0, fmt.Errorf("cannot decode binary double: %s", io.ErrShortBuffer)
+ }
+ return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
+}
+
+func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
+ if len(buf) < 4 {
+ return 0, fmt.Errorf("cannot decode binary float: %s", io.ErrShortBuffer)
+ }
+ return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
+}
+
+func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error {
+ buf := b.Borrow()[:1]
+ buf[0] = val
+ _, err := w.Write(buf)
+ b.Return(buf)
+ return err
+}
+
+func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error {
+ buf := b.Borrow()[:2]
+ byteOrder.PutUint16(buf, val)
+ _, err := w.Write(buf)
+ b.Return(buf)
+ return err
+}
+
+func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error {
+
+ buf := b.Borrow()[:4]
+ byteOrder.PutUint32(buf, val)
+ _, err := w.Write(buf)
+ b.Return(buf)
+ return err
+}
+
+func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error {
+ buf := b.Borrow()[:8]
+ byteOrder.PutUint64(buf, val)
+ _, err := w.Write(buf)
+ b.Return(buf)
+ return err
+}
+
+func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error) {
+ var value float64
+ switch v := datum.(type) {
+ case float64:
+ value = v
+ case float32:
+ value = float64(v)
+ case int:
+ if value = float64(v); int(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+ }
+ case int64:
+ if value = float64(v); int64(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+ }
+ case int32:
+ if value = float64(v); int32(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+ }
+ default:
+ return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+ }
+ var buf []byte
+ buf = append(buf, 0, 0, 0, 0, 0, 0, 0, 0)
+ binary.BigEndian.PutUint64(buf[len(buf)-8:], math.Float64bits(value))
+ return buf, nil
+}
+
+func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error) {
+ var value float32
+ switch v := datum.(type) {
+ case float32:
+ value = v
+ case float64:
+ value = float32(v)
+ case int:
+ if value = float32(v); int(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int would lose precision: %d", v)
+ }
+ case int64:
+ if value = float32(v); int64(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int64 would lose precision: %d", v)
+ }
+ case int32:
+ if value = float32(v); int32(value) != v {
+ return nil, fmt.Errorf("serialize failed: provided Go int32 would lose precision: %d", v)
+ }
+ default:
+ return nil, fmt.Errorf("serialize failed: expected: Go numeric; received: %T", datum)
+ }
+ var buf []byte
+ buf = append(buf, 0, 0, 0, 0)
+ binary.BigEndian.PutUint32(buf[len(buf)-4:], uint32(math.Float32bits(value)))
+ return buf, nil
+}
+
+func ReadElements(r io.Reader, elements ...interface{}) error {
+ for _, element := range elements {
+ err := readElement(r, element)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func WriteElements(w io.Writer, elements ...interface{}) error {
+ for _, element := range elements {
+ err := writeElement(w, element)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func readElement(r io.Reader, element interface{}) error {
+ switch e := element.(type) {
+ case *int8:
+ rv, err := BinarySerializer.Uint8(r)
+ if err != nil {
+ return err
+ }
+ *e = int8(rv)
+ return nil
+
+ case *int16:
+ rv, err := BinarySerializer.Uint16(r, bigEndian)
+ if err != nil {
+ return err
+ }
+ *e = int16(rv)
+ return nil
+
+ case *int32:
+ rv, err := BinarySerializer.Uint32(r, bigEndian)
+ if err != nil {
+ return err
+ }
+ *e = int32(rv)
+ return nil
+
+ case *int64:
+ rv, err := BinarySerializer.Uint64(r, bigEndian)
+ if err != nil {
+ return err
+ }
+ *e = int64(rv)
+ return nil
+
+ case *bool:
+ rv, err := BinarySerializer.Uint8(r)
+ if err != nil {
+ return err
+ }
+ if rv == 0x00 {
+ *e = false
+ } else {
+ *e = true
+ }
+ return nil
+ }
+ return binary.Read(r, bigEndian, element)
+}
+
+func writeElement(w io.Writer, element interface{}) error {
+ switch e := element.(type) {
+ case int8:
+ err := BinarySerializer.PutUint8(w, uint8(e))
+ if err != nil {
+ return err
+ }
+ return nil
+
+ case int16:
+ err := BinarySerializer.PutUint16(w, bigEndian, uint16(e))
+ if err != nil {
+ return err
+ }
+ return nil
+
+ case int32:
+ err := BinarySerializer.PutUint32(w, bigEndian, uint32(e))
+ if err != nil {
+ return err
+ }
+ return nil
+
+ case int64:
+ err := BinarySerializer.PutUint64(w, bigEndian, uint64(e))
+ if err != nil {
+ return err
+ }
+ return nil
+
+ case bool:
+ var err error
+ if e {
+ err = BinarySerializer.PutUint8(w, 0x01)
+ } else {
+ err = BinarySerializer.PutUint8(w, 0x00)
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ return binary.Write(w, bigEndian, element)
+}
diff --git a/pulsar-client-go/pulsar/primitiveSerDe_test.go b/pulsar-client-go/pulsar/primitiveSerDe_test.go
new file mode 100644
index 0000000..f2f0335
--- /dev/null
+++ b/pulsar-client-go/pulsar/primitiveSerDe_test.go
@@ -0,0 +1,145 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "bytes"
+ "io"
+ "reflect"
+ "testing"
+
+ "github.com/davecgh/go-spew/spew"
+)
+
+func TestWriteElements(t *testing.T) {
+ tests := []struct {
+ in interface{}
+ buf []byte
+ }{
+ {int8(1), []byte{0x01}},
+ {uint8(2), []byte{0x02}},
+ {int16(4), []byte{0x04, 0x00}},
+ {uint16(16), []byte{0x10, 0x00}},
+ {int32(1), []byte{0x01, 0x00, 0x00, 0x00}},
+ {uint32(256), []byte{0x00, 0x01, 0x00, 0x00}},
+ {
+ int64(65536),
+ []byte{0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00},
+ },
+ {
+ uint64(4294967296),
+ []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00},
+ },
+ {
+ true,
+ []byte{0x01},
+ },
+ {
+ false,
+ []byte{0x00},
+ },
+ }
+
+ t.Logf("Running %d tests", len(tests))
+ for i, test := range tests {
+ value := test
+
+ var buf bytes.Buffer
+ err := WriteElements(&buf, value.in)
+ if err != nil {
+ t.Errorf("writeElement #%d error %v", i, err)
+ continue
+ }
+ if !bytes.Equal(buf.Bytes(), test.buf) {
+ t.Error(test.in)
+ t.Errorf("writeElement #%d\n got: %s want: %s", i,
+ spew.Sdump(buf.Bytes()), spew.Sdump(test.buf))
+ continue
+ }
+
+ // Read from wire format.
+ rbuf := bytes.NewReader(test.buf)
+ val := test.in
+ if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+ val = reflect.New(reflect.TypeOf(test.in)).Interface()
+ }
+ err = ReadElements(rbuf, val)
+ if err != nil {
+ t.Errorf("readElement #%d error %v", i, err)
+ continue
+ }
+ ival := val
+ if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+ ival = reflect.Indirect(reflect.ValueOf(val)).Interface()
+ }
+ if !reflect.DeepEqual(ival, test.in) {
+ t.Errorf("readElement #%d\n got: %s want: %s", i,
+ spew.Sdump(ival), spew.Sdump(test.in))
+ continue
+ }
+ }
+}
+
+func TestElementErrors(t *testing.T) {
+ tests := []struct {
+ in interface{}
+ max int
+ writeErr error
+ readErr error
+ }{
+ {int8(1), 0, nil, io.EOF},
+ {uint8(2), 0, nil, io.EOF},
+ {int16(4), 0, nil, io.EOF},
+ {uint16(16), 0, nil, io.EOF},
+ {int32(1), 0, nil, io.EOF},
+ {uint32(256), 0, nil, io.EOF},
+ {
+ int64(65536),
+ 0, nil, io.EOF,
+ },
+ {
+ uint64(4294967296),
+ 0, nil, io.EOF,
+ },
+ {
+ true,
+ 0, nil, io.EOF,
+ },
+ {
+ false,
+ 0, nil, io.EOF,
+ },
+ }
+
+ t.Logf("Running %d tests", len(tests))
+ for i, test := range tests {
+ var r bytes.Reader
+ val := test.in
+ if reflect.ValueOf(test.in).Kind() != reflect.Ptr {
+ val = reflect.New(reflect.TypeOf(test.in)).Interface()
+ }
+ err := ReadElements(&r, val)
+ if err != test.readErr {
+ t.Errorf("readElement #%d wrong error got: %v, want: %v",
+ i, err, test.readErr)
+ continue
+ }
+ }
+}
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 17aea45..5e199f7 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -184,4 +184,6 @@ type Producer interface {
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Close() error
+
+ Schema() Schema
}
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index 633714b..df033a4 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -118,8 +118,7 @@ func TestProducerNoTopic(t *testing.T) {
defer client.Close()
- producer, err := client.CreateProducer(ProducerOptions{
- })
+ producer, err := client.CreateProducer(ProducerOptions{})
// Expect error in creating producer
assert.Nil(t, producer)
diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go
index 5592630..51ade6b 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -81,4 +81,6 @@ type Reader interface {
// Close the reader and stop the broker to push more messages
Close() error
+
+ Schema() Schema
}
diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go
index e6cb9f2..01247ce 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -75,7 +75,7 @@ func TestReader(t *testing.T) {
assert.Nil(t, err)
defer reader.Close()
- assert.Equal(t, reader.Topic(), "persistent://public/default/" + topic )
+ assert.Equal(t, reader.Topic(), "persistent://public/default/"+topic)
ctx := context.Background()
diff --git a/pulsar-client-go/pulsar/schema.go b/pulsar-client-go/pulsar/schema.go
new file mode 100644
index 0000000..4a6f7a1
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema.go
@@ -0,0 +1,504 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "reflect"
+ "unsafe"
+
+ log "github.com/apache/pulsar/pulsar-client-go/logutil"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/linkedin/goavro"
+)
+
+type SchemaType int
+
+const (
+ NONE SchemaType = iota //No schema defined
+ STRING //Simple String encoding with UTF-8
+ JSON //JSON object encoding and validation
+ PROTOBUF //Protobuf message encoding and decoding
+ AVRO //Serialize and deserialize via Avro
+ BOOLEAN //
+ INT8 //A 8-byte integer.
+ INT16 //A 16-byte integer.
+ INT32 //A 32-byte integer.
+ INT64 //A 64-byte integer.
+ FLOAT //A float number.
+ DOUBLE //A double number
+ _ //
+ _ //
+ _ //
+ KEY_VALUE //A Schema that contains Key Schema and Value Schema.
+ BYTES = -1 //A bytes array.
+ AUTO = -2 //
+ AUTO_CONSUME = -3 //Auto Consume Type.
+ AUTO_PUBLISH = -4 // Auto Publish Type.
+)
+
+// Encapsulates data around the schema definition
+type SchemaInfo struct {
+ Name string
+ Schema string
+ Type SchemaType
+ Properties map[string]string
+}
+
+type Schema interface {
+ Encode(v interface{}) ([]byte, error)
+ Decode(data []byte, v interface{}) error
+ Validate(message []byte) error
+ GetSchemaInfo() *SchemaInfo
+}
+
+type AvroCodec struct {
+ Codec *goavro.Codec
+}
+
+func NewSchemaDefinition(schema *goavro.Codec) *AvroCodec {
+ schemaDef := &AvroCodec{
+ Codec: schema,
+ }
+ return schemaDef
+}
+
+// initAvroCodec returns a Codec used to translate between a byte slice of either
+// binary or textual Avro data and native Go data.
+func initAvroCodec(codec string) (*goavro.Codec, error) {
+ return goavro.NewCodec(codec)
+}
+
+type JsonSchema struct {
+ AvroCodec
+ SchemaInfo
+}
+
+func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) *JsonSchema {
+ js := new(JsonSchema)
+ avroCodec, err := initAvroCodec(jsonAvroSchemaDef)
+ if err != nil {
+ log.Fatalf("init codec error:%v", err)
+ }
+ schemaDef := NewSchemaDefinition(avroCodec)
+ js.SchemaInfo.Schema = schemaDef.Codec.Schema()
+ js.SchemaInfo.Type = JSON
+ js.SchemaInfo.Properties = properties
+ js.SchemaInfo.Name = "Json"
+ return js
+}
+
+func (js *JsonSchema) Encode(data interface{}) ([]byte, error) {
+ return json.Marshal(data)
+}
+
+func (js *JsonSchema) Decode(data []byte, v interface{}) error {
+ return json.Unmarshal(data, v)
+}
+
+func (js *JsonSchema) Validate(message []byte) error {
+ return js.Decode(message, nil)
+}
+
+func (js *JsonSchema) GetSchemaInfo() *SchemaInfo {
+ return &js.SchemaInfo
+}
+
+type ProtoSchema struct {
+ AvroCodec
+ SchemaInfo
+}
+
+func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema {
+ ps := new(ProtoSchema)
+ avroCodec, err := initAvroCodec(protoAvroSchemaDef)
+ if err != nil {
+ log.Fatalf("init codec error:%v", err)
+ }
+ schemaDef := NewSchemaDefinition(avroCodec)
+ ps.AvroCodec.Codec = schemaDef.Codec
+ ps.SchemaInfo.Schema = schemaDef.Codec.Schema()
+ ps.SchemaInfo.Type = PROTOBUF
+ ps.SchemaInfo.Properties = properties
+ ps.SchemaInfo.Name = "Proto"
+ return ps
+}
+
+func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error) {
+ return proto.Marshal(data.(proto.Message))
+}
+
+func (ps *ProtoSchema) Decode(data []byte, v interface{}) error {
+ return proto.Unmarshal(data, v.(proto.Message))
+}
+
+func (ps *ProtoSchema) Validate(message []byte) error {
+ return ps.Decode(message, nil)
+}
+
+func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo {
+ return &ps.SchemaInfo
+}
+
+type AvroSchema struct {
+ AvroCodec
+ SchemaInfo
+}
+
+func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema {
+ as := new(AvroSchema)
+ avroCodec, err := initAvroCodec(avroSchemaDef)
+ if err != nil {
+ log.Fatalf("init codec error:%v", err)
+ }
+ schemaDef := NewSchemaDefinition(avroCodec)
+ as.AvroCodec.Codec = schemaDef.Codec
+ as.SchemaInfo.Schema = schemaDef.Codec.Schema()
+ as.SchemaInfo.Type = AVRO
+ as.SchemaInfo.Name = "Avro"
+ as.SchemaInfo.Properties = properties
+ return as
+}
+
+func (as *AvroSchema) Encode(data interface{}) ([]byte, error) {
+ textual, err := json.Marshal(data)
+ if err != nil {
+ log.Errorf("serialize data error:%s", err.Error())
+ return nil, err
+ }
+ native, _, err := as.Codec.NativeFromTextual(textual)
+ if err != nil {
+ log.Errorf("convert native Go form to binary Avro data error:%s", err.Error())
+ return nil, err
+ }
+ return as.Codec.BinaryFromNative(nil, native)
+}
+
+func (as *AvroSchema) Decode(data []byte, v interface{}) error {
+ native, _, err := as.Codec.NativeFromBinary(data)
+ if err != nil {
+ log.Errorf("convert binary Avro data back to native Go form error:%s", err.Error())
+ return err
+ }
+ textual, err := as.Codec.TextualFromNative(nil, native)
+ if err != nil {
+ log.Errorf("convert native Go form to textual Avro data error:%s", err.Error())
+ return err
+ }
+ err = json.Unmarshal(textual, v)
+ if err != nil {
+ log.Errorf("unSerialize textual error:%s", err.Error())
+ return err
+ }
+ return nil
+}
+
+func (as *AvroSchema) Validate(message []byte) error {
+ return as.Decode(message, nil)
+}
+
+func (as *AvroSchema) GetSchemaInfo() *SchemaInfo {
+ return &as.SchemaInfo
+}
+
+type StringSchema struct {
+ SchemaInfo
+}
+
+func NewStringSchema(properties map[string]string) *StringSchema {
+ strSchema := new(StringSchema)
+ strSchema.SchemaInfo.Properties = properties
+ strSchema.SchemaInfo.Name = "String"
+ strSchema.SchemaInfo.Type = STRING
+ strSchema.SchemaInfo.Schema = ""
+ return strSchema
+}
+
+func (ss *StringSchema) Encode(v interface{}) ([]byte, error) {
+ return []byte(v.(string)), nil
+}
+
+func (ss *StringSchema) Decode(data []byte, v interface{}) error {
+ bh := (*reflect.SliceHeader)(unsafe.Pointer(&data))
+ sh := reflect.StringHeader{
+ Data: bh.Data,
+ Len: bh.Len,
+ }
+ shPtr := (*string)(unsafe.Pointer(&sh))
+ reflect.ValueOf(v).Elem().Set(reflect.ValueOf(shPtr))
+ return nil
+}
+
+func (ss *StringSchema) Validate(message []byte) error {
+ return ss.Decode(message, nil)
+}
+
+func (ss *StringSchema) GetSchemaInfo() *SchemaInfo {
+ return &ss.SchemaInfo
+}
+
+type BytesSchema struct {
+ SchemaInfo
+}
+
+func NewBytesSchema(properties map[string]string) *BytesSchema {
+ bytesSchema := new(BytesSchema)
+ bytesSchema.SchemaInfo.Properties = properties
+ bytesSchema.SchemaInfo.Name = "Bytes"
+ bytesSchema.SchemaInfo.Type = BYTES
+ bytesSchema.SchemaInfo.Schema = ""
+ return bytesSchema
+}
+
+func (bs *BytesSchema) Encode(data interface{}) ([]byte, error) {
+ return data.([]byte), nil
+}
+
+func (bs *BytesSchema) Decode(data []byte, v interface{}) error {
+ reflect.ValueOf(v).Elem().Set(reflect.ValueOf(data))
+ return nil
+}
+
+func (bs *BytesSchema) Validate(message []byte) error {
+ return bs.Decode(message, nil)
+}
+
+func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo {
+ return &bs.SchemaInfo
+}
+
+type Int8Schema struct {
+ SchemaInfo
+}
+
+func NewInt8Schema(properties map[string]string) *Int8Schema {
+ int8Schema := new(Int8Schema)
+ int8Schema.SchemaInfo.Properties = properties
+ int8Schema.SchemaInfo.Schema = ""
+ int8Schema.SchemaInfo.Type = INT8
+ int8Schema.SchemaInfo.Name = "INT8"
+ return int8Schema
+}
+
+func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error) {
+ var buf bytes.Buffer
+ err := WriteElements(&buf, value.(int8))
+ return buf.Bytes(), err
+}
+
+func (is8 *Int8Schema) Decode(data []byte, v interface{}) error {
+ buf := bytes.NewReader(data)
+ return ReadElements(buf, v)
+}
+
+func (is8 *Int8Schema) Validate(message []byte) error {
+ if len(message) != 1 {
+ return errors.New("size of data received by Int8Schema is not 1")
+ }
+ return nil
+}
+
+func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo {
+ return &is8.SchemaInfo
+}
+
+type Int16Schema struct {
+ SchemaInfo
+}
+
+func NewInt16Schema(properties map[string]string) *Int16Schema {
+ int16Schema := new(Int16Schema)
+ int16Schema.SchemaInfo.Properties = properties
+ int16Schema.SchemaInfo.Name = "INT16"
+ int16Schema.SchemaInfo.Type = INT16
+ int16Schema.SchemaInfo.Schema = ""
+ return int16Schema
+}
+
+func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error) {
+ var buf bytes.Buffer
+ err := WriteElements(&buf, value.(int16))
+ return buf.Bytes(), err
+}
+
+func (is16 *Int16Schema) Decode(data []byte, v interface{}) error {
+ buf := bytes.NewReader(data)
+ return ReadElements(buf, v)
+}
+
+func (is16 *Int16Schema) Validate(message []byte) error {
+ if len(message) != 2 {
+ return errors.New("size of data received by Int16Schema is not 2")
+ }
+ return nil
+}
+
+func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo {
+ return &is16.SchemaInfo
+}
+
+type Int32Schema struct {
+ SchemaInfo
+}
+
+func NewInt32Schema(properties map[string]string) *Int32Schema {
+ int32Schema := new(Int32Schema)
+ int32Schema.SchemaInfo.Properties = properties
+ int32Schema.SchemaInfo.Schema = ""
+ int32Schema.SchemaInfo.Name = "INT32"
+ int32Schema.SchemaInfo.Type = INT32
+ return int32Schema
+}
+
+func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error) {
+ var buf bytes.Buffer
+ err := WriteElements(&buf, value.(int32))
+ return buf.Bytes(), err
+}
+
+func (is32 *Int32Schema) Decode(data []byte, v interface{}) error {
+ buf := bytes.NewReader(data)
+ return ReadElements(buf, v)
+}
+
+func (is32 *Int32Schema) Validate(message []byte) error {
+ if len(message) != 4 {
+ return errors.New("size of data received by Int32Schema is not 4")
+ }
+ return nil
+}
+
+func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo {
+ return &is32.SchemaInfo
+}
+
+type Int64Schema struct {
+ SchemaInfo
+}
+
+func NewInt64Schema(properties map[string]string) *Int64Schema {
+ int64Schema := new(Int64Schema)
+ int64Schema.SchemaInfo.Properties = properties
+ int64Schema.SchemaInfo.Name = "INT64"
+ int64Schema.SchemaInfo.Type = INT64
+ int64Schema.SchemaInfo.Schema = ""
+ return int64Schema
+}
+
+func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error) {
+ var buf bytes.Buffer
+ err := WriteElements(&buf, value.(int64))
+ return buf.Bytes(), err
+}
+
+func (is64 *Int64Schema) Decode(data []byte, v interface{}) error {
+ buf := bytes.NewReader(data)
+ return ReadElements(buf, v)
+}
+
+func (is64 *Int64Schema) Validate(message []byte) error {
+ if len(message) != 8 {
+ return errors.New("size of data received by Int64Schema is not 8")
+ }
+ return nil
+}
+
+func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo {
+ return &is64.SchemaInfo
+}
+
+type FloatSchema struct {
+ SchemaInfo
+}
+
+func NewFloatSchema(properties map[string]string) *FloatSchema {
+ floatSchema := new(FloatSchema)
+ floatSchema.SchemaInfo.Properties = properties
+ floatSchema.SchemaInfo.Type = FLOAT
+ floatSchema.SchemaInfo.Name = "FLOAT"
+ floatSchema.SchemaInfo.Schema = ""
+ return floatSchema
+}
+
+func (fs *FloatSchema) Encode(value interface{}) ([]byte, error) {
+ return BinarySerializer.PutFloat(value)
+}
+
+func (fs *FloatSchema) Decode(data []byte, v interface{}) error {
+ floatValue, err := BinarySerializer.Float32(data)
+ if err != nil {
+ log.Errorf("unSerialize float error:%s", err.Error())
+ return err
+ }
+ reflect.ValueOf(v).Elem().Set(reflect.ValueOf(floatValue))
+ return nil
+}
+
+func (fs *FloatSchema) Validate(message []byte) error {
+ if len(message) != 4 {
+ return errors.New("size of data received by FloatSchema is not 4")
+ }
+ return nil
+}
+
+func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo {
+ return &fs.SchemaInfo
+}
+
+type DoubleSchema struct {
+ SchemaInfo
+}
+
+func NewDoubleSchema(properties map[string]string) *DoubleSchema {
+ doubleSchema := new(DoubleSchema)
+ doubleSchema.SchemaInfo.Properties = properties
+ doubleSchema.SchemaInfo.Type = DOUBLE
+ doubleSchema.SchemaInfo.Name = "DOUBLE"
+ doubleSchema.SchemaInfo.Schema = ""
+ return doubleSchema
+}
+
+func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error) {
+ return BinarySerializer.PutDouble(value)
+}
+
+func (ds *DoubleSchema) Decode(data []byte, v interface{}) error {
+ doubleValue, err := BinarySerializer.Float64(data)
+ if err != nil {
+ log.Errorf("unSerialize double value error:%s", err.Error())
+ return err
+ }
+ reflect.ValueOf(v).Elem().Set(reflect.ValueOf(doubleValue))
+ return nil
+}
+
+func (ds *DoubleSchema) Validate(message []byte) error {
+ if len(message) != 8 {
+ return errors.New("size of data received by DoubleSchema is not 8")
+ }
+ return nil
+}
+
+func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo {
+ return &ds.SchemaInfo
+}
diff --git a/pulsar-client-go/pulsar/schemaDef_test.go b/pulsar-client-go/pulsar/schemaDef_test.go
new file mode 100644
index 0000000..5fb6803
--- /dev/null
+++ b/pulsar-client-go/pulsar/schemaDef_test.go
@@ -0,0 +1,58 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSchemaDef(t *testing.T) {
+ errSchemaDef := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"int64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ _, err := initAvroCodec(errSchemaDef)
+ assert.NotNil(t, err)
+
+ errSchemaDef1 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"bool\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ _, err = initAvroCodec(errSchemaDef1)
+ assert.NotNil(t, err)
+
+ errSchemaDef2 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"float32\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ _, err = initAvroCodec(errSchemaDef2)
+ assert.NotNil(t, err)
+
+ errSchemaDef3 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"float64\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ _, err = initAvroCodec(errSchemaDef3)
+ assert.NotNil(t, err)
+
+ errSchemaDef4 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ _, err = initAvroCodec(errSchemaDef4)
+ assert.NotNil(t, err)
+
+ errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer$\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\":[\"null\",\"string\"],\"default\":null\"}]}"
+ _, err = initAvroCodec(errSchemaDef5)
+ assert.NotNil(t, err)
+}
diff --git a/pulsar-client-go/pulsar/schema_test.go b/pulsar-client-go/pulsar/schema_test.go
new file mode 100644
index 0000000..3c83188
--- /dev/null
+++ b/pulsar-client-go/pulsar/schema_test.go
@@ -0,0 +1,436 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package pulsar
+
+import (
+ "context"
+ "testing"
+
+ log "github.com/apache/pulsar/pulsar-client-go/logutil"
+ "github.com/apache/pulsar/pulsar-client-go/pulsar/pb"
+ "github.com/stretchr/testify/assert"
+)
+
+type testJson struct {
+ ID int `json:"id"`
+ Name string `json:"name"`
+}
+
+type testAvro struct {
+ ID int
+ Name string
+}
+
+var (
+ exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+ protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+ "\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
+)
+
+func createClient() Client {
+ // create client
+ lookupUrl := "pulsar://localhost:6650"
+ client, err := NewClient(ClientOptions{
+ URL: lookupUrl,
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ return client
+}
+
+func TestJsonSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ jsonSchema := NewJsonSchema(exampleSchemaDef, nil)
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "jsonTopic",
+ }, jsonSchema)
+ err = producer.Send(context.Background(), ProducerMessage{
+ Value: &testJson{
+ ID: 100,
+ Name: "pulsar",
+ },
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ properties := make(map[string]string)
+ properties["pulsar"]="hello"
+ jsonSchemaWithProperties := NewJsonSchema(exampleSchemaDef, properties)
+ producer1, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "jsonTopic",
+ }, jsonSchemaWithProperties)
+ err = producer1.Send(context.Background(), ProducerMessage{
+ Value: &testJson{
+ ID: 100,
+ Name: "pulsar",
+ },
+ })
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer producer1.Close()
+
+ //create consumer
+ var s testJson
+
+ consumerJS := NewJsonSchema(exampleSchemaDef, nil)
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "jsonTopic",
+ SubscriptionName: "sub-2",
+ }, consumerJS)
+ assert.Nil(t, err)
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&s)
+ assert.Nil(t, err)
+ assert.Equal(t, s.ID, 100)
+ assert.Equal(t, s.Name, "pulsar")
+
+ defer consumer.Close()
+}
+
+func TestProtoSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ // create producer
+ psProducer := NewProtoSchema(protoSchemaDef, nil)
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "proto",
+ }, psProducer)
+ if err := producer.Send(context.Background(), ProducerMessage{
+ Value: &pb.Test{
+ Num: 100,
+ Msf: "pulsar",
+ },
+ }); err != nil {
+ log.Fatal(err)
+ }
+
+ //create consumer
+ unobj := pb.Test{}
+ psConsumer := NewProtoSchema(protoSchemaDef, nil)
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "proto",
+ SubscriptionName: "sub-1",
+ }, psConsumer)
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&unobj)
+ assert.Nil(t, err)
+ assert.Equal(t, unobj.Num, int32(100))
+ assert.Equal(t, unobj.Msf, "pulsar")
+ defer consumer.Close()
+}
+
+func TestAvroSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ // create producer
+ asProducer := NewAvroSchema(exampleSchemaDef, nil)
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "avro-topic",
+ }, asProducer)
+ assert.Nil(t, err)
+ if err := producer.Send(context.Background(), ProducerMessage{
+ Value: testAvro{
+ ID: 100,
+ Name: "pulsar",
+ },
+ }); err != nil {
+ log.Fatal(err)
+ }
+
+ //create consumer
+ unobj := testAvro{}
+
+ asConsumer := NewAvroSchema(exampleSchemaDef, nil)
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "avro-topic",
+ SubscriptionName: "sub-1",
+ }, asConsumer)
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&unobj)
+ assert.Nil(t, err)
+ assert.Equal(t, unobj.ID, 100)
+ assert.Equal(t, unobj.Name, "pulsar")
+ defer consumer.Close()
+}
+
+func TestStringSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ ssProducer := NewStringSchema(nil)
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "strTopic",
+ }, ssProducer)
+ assert.Nil(t, err)
+ if err := producer.Send(context.Background(), ProducerMessage{
+ Value: "hello pulsar",
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ var res *string
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "strTopic",
+ SubscriptionName: "sub-2",
+ }, NewStringSchema(nil))
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Equal(t, *res, "hello pulsar")
+
+ defer consumer.Close()
+}
+
+func TestBytesSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ bytes := []byte{121, 110, 121, 110}
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "bytesTopic",
+ }, NewBytesSchema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: bytes,
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ var res []byte
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "bytesTopic",
+ SubscriptionName: "sub-2",
+ }, NewBytesSchema(nil))
+ assert.Nil(t, err)
+
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Equal(t, res, bytes)
+
+ defer consumer.Close()
+}
+
+func TestInt8Schema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "int8Topic1",
+ }, NewInt8Schema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: int8(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "int8Topic1",
+ SubscriptionName: "sub-2",
+ }, NewInt8Schema(nil))
+ assert.Nil(t, err)
+
+ var res int8
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, int8(1))
+
+ defer consumer.Close()
+}
+
+func TestInt16Schema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "int16Topic",
+ }, NewInt16Schema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: int16(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "int16Topic",
+ SubscriptionName: "sub-2",
+ }, NewInt16Schema(nil))
+ assert.Nil(t, err)
+
+ var res int16
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, int16(1))
+ defer consumer.Close()
+}
+
+func TestInt32Schema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "int32Topic1",
+ }, NewInt32Schema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: int32(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "int32Topic1",
+ SubscriptionName: "sub-2",
+ }, NewInt32Schema(nil))
+ assert.Nil(t, err)
+
+ var res int32
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, int32(1))
+ defer consumer.Close()
+}
+
+func TestInt64Schema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "int64Topic",
+ }, NewInt64Schema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: int64(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "int64Topic",
+ SubscriptionName: "sub-2",
+ }, NewInt64Schema(nil))
+ assert.Nil(t, err)
+
+ var res int64
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, int64(1))
+ defer consumer.Close()
+}
+
+func TestFloatSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "floatTopic",
+ }, NewFloatSchema(nil))
+ assert.Nil(t, err)
+ if err := producer.Send(context.Background(), ProducerMessage{
+ Value: float32(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "floatTopic",
+ SubscriptionName: "sub-2",
+ }, NewFloatSchema(nil))
+ assert.Nil(t, err)
+
+ var res float32
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, float32(1))
+ defer consumer.Close()
+}
+
+func TestDoubleSchema(t *testing.T) {
+ client := createClient()
+ defer client.Close()
+
+ producer, err := client.CreateTypedProducer(ProducerOptions{
+ Topic: "doubleTopic",
+ }, NewDoubleSchema(nil))
+ assert.Nil(t, err)
+ ctx := context.Background()
+ if err := producer.Send(ctx, ProducerMessage{
+ Value: float64(1),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ defer producer.Close()
+
+ consumer, err := client.SubscribeWithSchema(ConsumerOptions{
+ Topic: "doubleTopic",
+ SubscriptionName: "sub-2",
+ }, NewDoubleSchema(nil))
+ assert.Nil(t, err)
+
+ var res float64
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ err = msg.GetValue(&res)
+ assert.Nil(t, err)
+ assert.Equal(t, res, float64(1))
+ defer consumer.Close()
+}
diff --git a/pulsar-client-go/pulsar/util_test.go b/pulsar-client-go/pulsar/testhelps.go
similarity index 100%
rename from pulsar-client-go/pulsar/util_test.go
rename to pulsar-client-go/pulsar/testhelps.go