You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/11 18:25:30 UTC

[pulsar] branch master updated: Pass schema info to C++ client (#3354)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef94cb7  Pass schema info to C++ client (#3354)
ef94cb7 is described below

commit ef94cb70df08d7e9f16c6415116e729bb6d756ab
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jan 11 10:25:24 2019 -0800

    Pass schema info to C++ client (#3354)
    
    * Pass schema info to C++ client
    
    * Comments
---
 pulsar-client-cpp/include/pulsar/Client.h          |   1 +
 .../include/pulsar/ConsumerConfiguration.h         |  16 +++
 pulsar-client-cpp/include/pulsar/Producer.h        |  10 ++
 .../include/pulsar/ProducerConfiguration.h         |  21 +++
 .../include/pulsar/ReaderConfiguration.h           |  16 +++
 pulsar-client-cpp/include/pulsar/Schema.h          | 160 +++++++++++++++++++++
 pulsar-client-cpp/lib/ClientConnection.cc          |  11 +-
 pulsar-client-cpp/lib/ClientConnection.h           |   7 +-
 pulsar-client-cpp/lib/Commands.cc                  |  61 +++++++-
 pulsar-client-cpp/lib/Commands.h                   |   7 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   7 +
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   4 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              |   6 +-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |   6 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |   2 +
 pulsar-client-cpp/lib/Producer.cc                  |   2 +
 pulsar-client-cpp/lib/ProducerConfiguration.cc     |   7 +
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |   4 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              |  13 +-
 pulsar-client-cpp/lib/ProducerImpl.h               |   3 +
 pulsar-client-cpp/lib/ProducerImplBase.h           |   1 +
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |   7 +
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |   4 +-
 pulsar-client-cpp/lib/ReaderImpl.cc                |   1 +
 pulsar-client-cpp/lib/Schema.cc                    | 103 +++++++++++++
 pulsar-client-cpp/tests/SchemaTest.cc              |  69 +++++++++
 26 files changed, 529 insertions(+), 20 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index d77eb6d..e04b1b8 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -26,6 +26,7 @@
 #include <pulsar/Message.h>
 #include <pulsar/MessageBuilder.h>
 #include <pulsar/ClientConfiguration.h>
+#include <pulsar/Schema.h>
 #include <string>
 
 #pragma GCC visibility push(default)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 0687166..60ffef1 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -24,6 +24,7 @@
 #include <pulsar/Result.h>
 #include <pulsar/ConsumerType.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 #include <pulsar/ConsumerCryptoFailureAction.h>
 #include <pulsar/CryptoKeyReader.h>
 
@@ -52,6 +53,21 @@ class ConsumerConfiguration {
     ConsumerConfiguration& operator=(const ConsumerConfiguration&);
 
     /**
+     * Declare the schema of the data that this consumer will be accepting.
+     *
+     * The schema will be checked against the schema of the topic, and the
+     * consumer creation will fail if it's not compatible.
+     *
+     * @param schemaInfo the schema definition object
+     */
+    ConsumerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this consumer
+     */
+    const SchemaInfo& getSchema() const;
+
+    /**
      * Specify the consumer type. The consumer type enables
      * specifying the type of subscription. In Exclusive subscription,
      * only a single consumer is allowed to attach to the subscription. Other consumers
diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index 407d937..1941e15 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -110,6 +110,16 @@ class Producer {
     int64_t getLastSequenceId() const;
 
     /**
+     * Return an identifier for the schema version that this producer was created with.
+     *
+     * When the producer is created, if a schema info was passed, the broker will
+     * determine the version of the passed schema. This identifier should be treated
+     * as an opaque identifier. In particular, even though this is represented as a string, the
+     * version might not be ascii printable.
+     */
+    const std::string& getSchemaVersion() const;
+
+    /**
      * Close the producer and release resources allocated.
      *
      * No more writes will be accepted from this producer. Waits until
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 565a6ab..6e3d0b4 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -25,6 +25,7 @@
 #include <boost/function.hpp>
 #include <pulsar/ProducerCryptoFailureAction.h>
 #include <pulsar/CryptoKeyReader.h>
+#include <pulsar/Schema.h>
 
 #include <set>
 
@@ -55,6 +56,7 @@ class ProducerConfiguration {
         BoostHash,
         JavaStringHash
     };
+
     ProducerConfiguration();
     ~ProducerConfiguration();
     ProducerConfiguration(const ProducerConfiguration&);
@@ -63,6 +65,25 @@ class ProducerConfiguration {
     ProducerConfiguration& setProducerName(const std::string& producerName);
     const std::string& getProducerName() const;
 
+    /**
+     * Declare the schema of the data that will be published by this producer.
+     *
+     * The schema will be checked against the schema of the topic, and it
+     * will fail if it's not compatible, though the client library will
+     * not perform any validation that the actual message payload are
+     * conforming to the specified schema.
+     *
+     * For all purposes, this
+     * @param schemaInfo
+     * @return
+     */
+    ProducerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this producer
+     */
+    const SchemaInfo& getSchema() const;
+
     ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
     int getSendTimeout() const;
 
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 8d365ab..69776e3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -23,6 +23,7 @@
 #include <boost/shared_ptr.hpp>
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 
 #pragma GCC visibility push(default)
 namespace pulsar {
@@ -49,6 +50,21 @@ class ReaderConfiguration {
     ReaderConfiguration& operator=(const ReaderConfiguration&);
 
     /**
+     * Declare the schema of the data that this reader will be accepting.
+     *
+     * The schema will be checked against the schema of the topic, and the
+     * reader creation will fail if it's not compatible.
+     *
+     * @param schemaInfo the schema definition object
+     */
+    ReaderConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+    /**
+     * @return the schema information declared for this consumer
+     */
+    const SchemaInfo& getSchema() const;
+
+    /**
      * A message listener enables your application to configure how to process
      * messages. A listener will be called in order for every message received.
      */
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
new file mode 100644
index 0000000..eb2ebbb
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+
+#include <iosfwd>
+#include <boost/shared_ptr.hpp>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum SchemaType
+{
+    /**
+     * No schema defined
+     */
+    NONE = 0,
+
+    /**
+     * Simple String encoding with UTF-8
+     */
+    STRING = 1,
+
+    /**
+     * A 8-byte integer.
+     */
+    INT8 = 2,
+
+    /**
+     * A 16-byte integer.
+     */
+    INT16 = 3,
+
+    /**
+     * A 32-byte integer.
+     */
+    INT32 = 4,
+
+    /**
+     * A 64-byte integer.
+     */
+    INT64 = 5,
+
+    /**
+     * A float number.
+     */
+    FLOAT = 6,
+
+    /**
+     * A double number
+     */
+    DOUBLE = 7,
+
+    /**
+     * A bytes array.
+     */
+    BYTES = 8,
+
+    /**
+     * JSON object encoding and validation
+     */
+    JSON = 9,
+
+    /**
+     * Protobuf message encoding and decoding
+     */
+    PROTOBUF = 10,
+
+    /**
+     * Serialize and deserialize via Avro
+     */
+    AVRO = 11,
+
+    /**
+     * Auto Consume Type.
+     */
+    AUTO_CONSUME = 13,
+
+    /**
+     * Auto Publish Type.
+     */
+    AUTO_PUBLISH = 14,
+
+    /**
+     * A Schema that contains Key Schema and Value Schema.
+     */
+    KEY_VALUE = 15,
+};
+
+// Return string representation of result code
+const char *strSchemaType(SchemaType schemaType);
+
+class SchemaInfoImpl;
+
+typedef std::map<std::string, std::string> StringMap;
+
+/**
+ * Encapsulates data around the schema definition
+ */
+class SchemaInfo {
+   public:
+    SchemaInfo();
+
+    /**
+     * @param schemaType the schema type
+     * @param name the name of the schema definition
+     * @param schema the schema definition as a JSON string
+     * @param properties a map of custom defined properties attached to the schema
+     */
+    SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
+               const StringMap &properties = StringMap());
+
+    /**
+     * @return the schema type
+     */
+    SchemaType getSchemaType() const;
+
+    /**
+     * @return the name of the schema definition
+     */
+    const std::string &getName() const;
+
+    /**
+     * @return the schema definition as a JSON string
+     */
+    const std::string &getSchema() const;
+
+    /**
+     * @return a map of custom defined properties attached to the schema
+     */
+    const StringMap &getProperties() const;
+
+   private:
+    typedef boost::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
+    SchemaInfoImplPtr impl_;
+};
+
+}  // namespace pulsar
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType);
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fbda62b..0d8106f 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -688,7 +688,7 @@ void ClientConnection::handleIncomingCommand() {
                         pendingRequests_.erase(it);
                         lock.unlock();
 
-                        requestData.promise.setValue({"", -1});
+                        requestData.promise.setValue({});
                         requestData.timer->cancel();
                     }
                     break;
@@ -853,8 +853,13 @@ void ClientConnection::handleIncomingCommand() {
                         pendingRequests_.erase(it);
                         lock.unlock();
 
-                        requestData.promise.setValue(
-                            {producerSuccess.producer_name(), producerSuccess.last_sequence_id()});
+                        ResponseData data;
+                        data.producerName = producerSuccess.producer_name();
+                        data.lastSequenceId = producerSuccess.last_sequence_id();
+                        if (producerSuccess.has_schema_version()) {
+                            data.schemaVersion = producerSuccess.schema_version();
+                        }
+                        requestData.promise.setValue(data);
                         requestData.timer->cancel();
                     }
                     break;
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index ac8d6c5..5b47fb5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -68,7 +68,12 @@ class LookupDataResult;
 
 struct OpSendMsg;
 
-typedef std::pair<std::string, int64_t> ResponseData;
+// Data returned on the request operation. Mostly used on create-producer command
+struct ResponseData {
+    std::string producerName;
+    int64_t lastSequenceId;
+    std::string schemaVersion;
+};
 
 typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
 
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index b048dce..4c80154 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -25,6 +25,7 @@
 #include "PulsarApi.pb.h"
 #include "Utils.h"
 #include "Url.h"
+#include <pulsar/Schema.h>
 #include "checksum/ChecksumProvider.h"
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
@@ -36,6 +37,51 @@ using namespace pulsar::proto;
 
 DECLARE_LOG_OBJECT();
 
+static inline bool isBuiltInSchema(SchemaType schemaType) {
+    switch (schemaType) {
+        case STRING:
+        case JSON:
+        case AVRO:
+        case PROTOBUF:
+            return true;
+
+        default:
+            return false;
+    }
+}
+
+static inline proto::Schema_Type getSchemaType(SchemaType type) {
+    switch (type) {
+        case SchemaType::NONE:
+            return Schema_Type_None;
+        case STRING:
+            return Schema_Type_String;
+        case JSON:
+            return Schema_Type_Json;
+        case PROTOBUF:
+            return Schema_Type_Protobuf;
+        case AVRO:
+            return Schema_Type_Avro;
+        default:
+            return Schema_Type_None;
+    }
+}
+
+static proto::Schema* getSchema(const SchemaInfo& schemaInfo) {
+    proto::Schema* schema = proto::Schema().New();
+    schema->set_name(schemaInfo.getName());
+    schema->set_schema_data(schemaInfo.getSchema());
+    schema->set_type(getSchemaType(schemaInfo.getSchemaType()));
+    for (const auto& kv : schemaInfo.getProperties()) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(kv.first);
+        keyValue->set_value(kv.second);
+        schema->mutable_properties()->AddAllocated(keyValue);
+    }
+
+    return schema;
+}
+
 SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
     size_t cmdSize = cmd.ByteSize();
     size_t frameSize = 4 + cmdSize;
@@ -189,7 +235,8 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
                                     uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
                                     const std::string& consumerName, SubscriptionMode subscriptionMode,
                                     Optional<MessageId> startMessageId, bool readCompacted,
-                                    const std::map<std::string, std::string>& metadata) {
+                                    const std::map<std::string, std::string>& metadata,
+                                    const SchemaInfo& schemaInfo) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -201,6 +248,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
     subscribe->set_consumer_name(consumerName);
     subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
     subscribe->set_read_compacted(readCompacted);
+
+    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+        subscribe->set_allocated_schema(getSchema(schemaInfo));
+    }
+
     if (startMessageId.is_present()) {
         MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
         messageIdData.set_ledgerid(startMessageId.value().ledgerId());
@@ -233,7 +285,8 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
 
 SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
                                    const std::string& producerName, uint64_t requestId,
-                                   const std::map<std::string, std::string>& metadata) {
+                                   const std::map<std::string, std::string>& metadata,
+                                   const SchemaInfo& schemaInfo) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
@@ -248,6 +301,10 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
         producer->mutable_metadata()->AddAllocated(keyValue);
     }
 
+    if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+        producer->set_allocated_schema(getSchema(schemaInfo));
+    }
+
     if (!producerName.empty()) {
         producer->set_producer_name(producerName);
     }
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index e669953..15611a8 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/Authentication.h>
 #include <pulsar/Message.h>
+#include <pulsar/Schema.h>
 
 #include "PulsarApi.pb.h"
 #include "SharedBuffer.h"
@@ -78,13 +79,15 @@ class Commands {
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, const std::string& consumerName,
                                      SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
-                                     bool readCompacted, const std::map<std::string, std::string>& metadata);
+                                     bool readCompacted, const std::map<std::string, std::string>& metadata,
+                                     const SchemaInfo& schemaInfo);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
 
     static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
                                     const std::string& producerName, uint64_t requestId,
-                                    const std::map<std::string, std::string>& metadata);
+                                    const std::map<std::string, std::string>& metadata,
+                                    const SchemaInfo& schemaInfo);
 
     static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 4014ad2..4a7e73a 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -33,6 +33,13 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat
     return *this;
 }
 
+ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ConsumerConfiguration::getSchema() const { return impl_->schemaInfo; }
+
 long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const {
     return impl_->brokerConsumerStatsCacheTimeInMs;
 }
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 16e91c8..bf9ceb5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -24,6 +24,7 @@
 
 namespace pulsar {
 struct ConsumerConfigurationImpl {
+    SchemaInfo schemaInfo;
     long unAckedMessagesTimeoutMs;
     ConsumerType consumerType;
     MessageListener messageListener;
@@ -38,7 +39,8 @@ struct ConsumerConfigurationImpl {
     int patternAutoDiscoveryPeriod;
     std::map<std::string, std::string> properties;
     ConsumerConfigurationImpl()
-        : unAckedMessagesTimeoutMs(0),
+        : schemaInfo(),
+          unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
           messageListener(),
           hasMessageListener(false),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 62446d9..35a18db 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -137,9 +137,9 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
 
     ClientImplPtr client = client_.lock();
     uint64_t requestId = client->newRequestId();
-    SharedBuffer cmd =
-        Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_,
-                               subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties());
+    SharedBuffer cmd = Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(),
+                                              consumerName_, subscriptionMode_, startMessageId_,
+                                              readCompacted_, config_.getProperties(), config_.getSchema());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, _1));
 }
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0f2780e..bd27b0e 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -147,6 +147,12 @@ const std::string& PartitionedProducerImpl::getProducerName() const {
     return producers_[0]->getProducerName();
 }
 
+const std::string& PartitionedProducerImpl::getSchemaVersion() const {
+    // Since the schema is atomically assigned on the partitioned-topic,
+    // it's guaranteed that all the partitions will have the same schema version.
+    return producers_[0]->getSchemaVersion();
+}
+
 int64_t PartitionedProducerImpl::getLastSequenceId() const {
     int64_t currentMax = -1L;
     for (int i = 0; i < producers_.size(); i++) {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 0a3d949..283d273 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -59,6 +59,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual int64_t getLastSequenceId() const;
 
+    virtual const std::string& getSchemaVersion() const;
+
     virtual void start();
 
     virtual void shutdown();
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index bee213e..1659a21 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -60,6 +60,8 @@ const std::string& Producer::getProducerName() const { return impl_->getProducer
 
 int64_t Producer::getLastSequenceId() const { return impl_->getLastSequenceId(); }
 
+const std::string& Producer::getSchemaVersion() const { return impl_->getSchemaVersion(); }
+
 Result Producer::close() {
     Promise<bool, Result> promise;
     closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ad2cf9..a216fc1 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -187,6 +187,13 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
     return *this;
 }
 
+ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ProducerConfiguration::getSchema() const { return impl_->schemaInfo; }
+
 bool ProducerConfiguration::hasProperty(const std::string& name) const {
     const std::map<std::string, std::string>& m = impl_->properties;
     return m.find(name) != m.end();
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index cc5b9b7..7ec09c8 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -27,6 +27,7 @@
 namespace pulsar {
 
 struct ProducerConfigurationImpl {
+    SchemaInfo schemaInfo;
     Optional<std::string> producerName;
     Optional<int64_t> initialSequenceId;
     int sendTimeoutMs;
@@ -46,7 +47,8 @@ struct ProducerConfigurationImpl {
     ProducerCryptoFailureAction cryptoFailureAction;
     std::map<std::string, std::string> properties;
     ProducerConfigurationImpl()
-        : sendTimeoutMs(30000),
+        : schemaInfo(),
+          sendTimeoutMs(30000),
           compressionType(CompressionNone),
           maxPendingMessages(1000),
           maxPendingMessagesAcrossPartitions(50000),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 29d081b..4c5c56d 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -102,6 +102,8 @@ const std::string& ProducerImpl::getProducerName() const { return producerName_;
 
 int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; }
 
+const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
+
 void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
     if (ec) {
         LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -127,8 +129,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd =
-        Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties());
+    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
+                                             conf_.getProperties(), conf_.getSchema());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, _1, _2));
 }
@@ -150,20 +152,19 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
     if (result == ResultOk) {
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
-        const std::string& producerName = responseData.first;
-        int64_t lastSequenceId = responseData.second;
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
 
         Lock lock(mutex_);
         cnx->registerProducer(producerId_, shared_from_this());
-        producerName_ = producerName;
+        producerName_ = responseData.producerName;
+        schemaVersion_ = responseData.schemaVersion;
         producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
         if (batchMessageContainer) {
             batchMessageContainer->producerName_ = producerName_;
         }
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
-            lastSequenceIdPublished_ = lastSequenceId;
+            lastSequenceIdPublished_ = responseData.lastSequenceId;
             msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
         }
         resendMessages(cnx);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index aa22abc..00f7f31 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -81,6 +81,8 @@ class ProducerImpl : public HandlerBase,
 
     int64_t getLastSequenceId() const;
 
+    const std::string& getSchemaVersion() const;
+
     uint64_t getProducerId() const;
 
     virtual void start();
@@ -147,6 +149,7 @@ class ProducerImpl : public HandlerBase,
     BatchMessageContainerPtr batchMessageContainer;
 
     volatile int64_t lastSequenceIdPublished_;
+    std::string schemaVersion_;
 
     typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
     TimerPtr sendTimer_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h
index 3dd92a4..25b4db3 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -34,6 +34,7 @@ class ProducerImplBase {
     virtual const std::string& getProducerName() const = 0;
 
     virtual int64_t getLastSequenceId() const = 0;
+    virtual const std::string& getSchemaVersion() const = 0;
 
     virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
     virtual void closeAsync(CloseCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index a649b33..1f13c02 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -31,6 +31,13 @@ ReaderConfiguration& ReaderConfiguration::operator=(const ReaderConfiguration& x
     return *this;
 }
 
+ReaderConfiguration& ReaderConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+    impl_->schemaInfo = schemaInfo;
+    return *this;
+}
+
+const SchemaInfo& ReaderConfiguration::getSchema() const { return impl_->schemaInfo; }
+
 ReaderConfiguration& ReaderConfiguration::setReaderListener(ReaderListener readerListener) {
     impl_->readerListener = readerListener;
     impl_->hasReaderListener = true;
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 5dca8e3..73082ea 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -24,6 +24,7 @@
 
 namespace pulsar {
 struct ReaderConfigurationImpl {
+    SchemaInfo schemaInfo;
     ReaderListener readerListener;
     bool hasReaderListener;
     int receiverQueueSize;
@@ -31,7 +32,8 @@ struct ReaderConfigurationImpl {
     std::string subscriptionRolePrefix;
     bool readCompacted;
     ReaderConfigurationImpl()
-        : hasReaderListener(false),
+        : schemaInfo(),
+          hasReaderListener(false),
           receiverQueueSize(1000),
           readerName(),
           subscriptionRolePrefix(),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 9507a0b..ce2ef21 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -33,6 +33,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
     consumerConf.setConsumerType(ConsumerExclusive);
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
     consumerConf.setReadCompacted(readerConf_.isReadCompacted());
+    consumerConf.setSchema(readerConf_.getSchema());
 
     if (readerConf_.getReaderName().length() > 0) {
         consumerConf.setConsumerName(readerConf_.getReaderName());
diff --git a/pulsar-client-cpp/lib/Schema.cc b/pulsar-client-cpp/lib/Schema.cc
new file mode 100644
index 0000000..4877369
--- /dev/null
+++ b/pulsar-client-cpp/lib/Schema.cc
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Schema.h>
+
+#include <iostream>
+#include <map>
+#include <boost/make_shared.hpp>
+#include <include/pulsar/Schema.h>
+
+#pragma GCC visibility push(default)
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType) {
+    return s << strSchemaType(schemaType);
+}
+
+namespace pulsar {
+
+const char *strSchemaType(SchemaType schemaType) {
+    switch (schemaType) {
+        case NONE:
+            return "NONE";
+        case STRING:
+            return "STRING";
+        case INT8:
+            return "INT8";
+        case INT16:
+            return "INT16";
+        case INT32:
+            return "INT32";
+        case INT64:
+            return "INT64";
+        case FLOAT:
+            return "FLOAT";
+        case DOUBLE:
+            return "DOUBLE";
+        case BYTES:
+            return "BYTES";
+        case JSON:
+            return "JSON";
+        case PROTOBUF:
+            return "PROTOBUF";
+        case AVRO:
+            return "AVRO";
+        case AUTO_CONSUME:
+            return "AUTO_CONSUME";
+        case AUTO_PUBLISH:
+            return "AUTO_PUBLISH";
+        case KEY_VALUE:
+            return "KEY_VALUE";
+    };
+    // NOTE : Do not add default case in the switch above. In future if we get new cases for
+    // Schema and miss them in the switch above we would like to get notified. Adding
+    // return here to make the compiler happy.
+    return "UnknownSchemaType";
+}
+
+class SchemaInfoImpl {
+   public:
+    const std::string name_;
+    const std::string schema_;
+    const SchemaType type_;
+    const std::map<std::string, std::string> properties_;
+
+    SchemaInfoImpl() : name_("BYTES"), schema_(), type_(BYTES), properties_() {}
+
+    SchemaInfoImpl(SchemaType schemaType, const std::string &name, const std::string &schema,
+                   const StringMap &properties)
+        : type_(schemaType), name_(name), schema_(schema), properties_(properties) {}
+};
+
+SchemaInfo::SchemaInfo() : impl_(boost::make_shared<SchemaInfoImpl>()) {}
+
+SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
+                       const StringMap &properties)
+    : impl_(boost::make_shared<SchemaInfoImpl>(schemaType, name, schema, properties)) {}
+
+SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
+
+const std::string &SchemaInfo::getName() const { return impl_->name_; }
+
+const std::string &SchemaInfo::getSchema() const { return impl_->schema_; }
+
+const std::map<std::string, std::string> &SchemaInfo::getProperties() const { return impl_->properties_; }
+
+}  // namespace pulsar
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc
new file mode 100644
index 0000000..7ad7b9e
--- /dev/null
+++ b/pulsar-client-cpp/tests/SchemaTest.cc
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+static const std::string exampleSchema =
+    "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+    "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+TEST(SchemaTest, testSchema) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    Result res;
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+    res = client.createProducer("topic-avro", producerConf, producer);
+    producer.close();
+
+    ASSERT_EQ(ResultOk, res);
+
+    // Creating producer with no schema on same topic should fail
+    producerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+    res = client.createProducer("topic-avro", producerConf, producer);
+    ASSERT_EQ(ResultIncompatibleSchema, res);
+
+    // Creating producer with no schema on same topic should succeed
+    // because standalone broker is configured by default to not
+    // require the schema to be set
+    res = client.createProducer("topic-avro", producer);
+    ASSERT_EQ(ResultOk, res);
+
+    ConsumerConfiguration consumerConf;
+    Consumer consumer;
+    // Subscribing with no schema will still succeed
+    res = client.subscribe("topic-avro", "sub-1", consumerConf, consumer);
+    ASSERT_EQ(ResultOk, res);
+
+    // Subscribing with same Avro schema will succeed
+    consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+    ASSERT_EQ(ResultOk, res);
+
+    // Subscribing with different schema type will fail
+    consumerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+    res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+    ASSERT_EQ(ResultIncompatibleSchema, res);
+}