You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/11 18:25:30 UTC
[pulsar] branch master updated: Pass schema info to C++ client
(#3354)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef94cb7 Pass schema info to C++ client (#3354)
ef94cb7 is described below
commit ef94cb70df08d7e9f16c6415116e729bb6d756ab
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jan 11 10:25:24 2019 -0800
Pass schema info to C++ client (#3354)
* Pass schema info to C++ client
* Comments
---
pulsar-client-cpp/include/pulsar/Client.h | 1 +
.../include/pulsar/ConsumerConfiguration.h | 16 +++
pulsar-client-cpp/include/pulsar/Producer.h | 10 ++
.../include/pulsar/ProducerConfiguration.h | 21 +++
.../include/pulsar/ReaderConfiguration.h | 16 +++
pulsar-client-cpp/include/pulsar/Schema.h | 160 +++++++++++++++++++++
pulsar-client-cpp/lib/ClientConnection.cc | 11 +-
pulsar-client-cpp/lib/ClientConnection.h | 7 +-
pulsar-client-cpp/lib/Commands.cc | 61 +++++++-
pulsar-client-cpp/lib/Commands.h | 7 +-
pulsar-client-cpp/lib/ConsumerConfiguration.cc | 7 +
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 6 +-
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 6 +
pulsar-client-cpp/lib/PartitionedProducerImpl.h | 2 +
pulsar-client-cpp/lib/Producer.cc | 2 +
pulsar-client-cpp/lib/ProducerConfiguration.cc | 7 +
pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 4 +-
pulsar-client-cpp/lib/ProducerImpl.cc | 13 +-
pulsar-client-cpp/lib/ProducerImpl.h | 3 +
pulsar-client-cpp/lib/ProducerImplBase.h | 1 +
pulsar-client-cpp/lib/ReaderConfiguration.cc | 7 +
pulsar-client-cpp/lib/ReaderConfigurationImpl.h | 4 +-
pulsar-client-cpp/lib/ReaderImpl.cc | 1 +
pulsar-client-cpp/lib/Schema.cc | 103 +++++++++++++
pulsar-client-cpp/tests/SchemaTest.cc | 69 +++++++++
26 files changed, 529 insertions(+), 20 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index d77eb6d..e04b1b8 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -26,6 +26,7 @@
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
#include <pulsar/ClientConfiguration.h>
+#include <pulsar/Schema.h>
#include <string>
#pragma GCC visibility push(default)
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 0687166..60ffef1 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -24,6 +24,7 @@
#include <pulsar/Result.h>
#include <pulsar/ConsumerType.h>
#include <pulsar/Message.h>
+#include <pulsar/Schema.h>
#include <pulsar/ConsumerCryptoFailureAction.h>
#include <pulsar/CryptoKeyReader.h>
@@ -52,6 +53,21 @@ class ConsumerConfiguration {
ConsumerConfiguration& operator=(const ConsumerConfiguration&);
/**
+ * Declare the schema of the data that this consumer will be accepting.
+ *
+ * The schema will be checked against the schema of the topic, and the
+ * consumer creation will fail if it's not compatible.
+ *
+ * @param schemaInfo the schema definition object
+ */
+ ConsumerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+ /**
+ * @return the schema information declared for this consumer
+ */
+ const SchemaInfo& getSchema() const;
+
+ /**
* Specify the consumer type. The consumer type enables
* specifying the type of subscription. In Exclusive subscription,
* only a single consumer is allowed to attach to the subscription. Other consumers
diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index 407d937..1941e15 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -110,6 +110,16 @@ class Producer {
int64_t getLastSequenceId() const;
/**
+ * Return an identifier for the schema version that this producer was created with.
+ *
+ * When the producer is created, if a schema info was passed, the broker will
+ * determine the version of the passed schema. This identifier should be treated
+ * as an opaque identifier. In particular, even though this is represented as a string, the
+ * version might not be ascii printable.
+ */
+ const std::string& getSchemaVersion() const;
+
+ /**
* Close the producer and release resources allocated.
*
* No more writes will be accepted from this producer. Waits until
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 565a6ab..6e3d0b4 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -25,6 +25,7 @@
#include <boost/function.hpp>
#include <pulsar/ProducerCryptoFailureAction.h>
#include <pulsar/CryptoKeyReader.h>
+#include <pulsar/Schema.h>
#include <set>
@@ -55,6 +56,7 @@ class ProducerConfiguration {
BoostHash,
JavaStringHash
};
+
ProducerConfiguration();
~ProducerConfiguration();
ProducerConfiguration(const ProducerConfiguration&);
@@ -63,6 +65,25 @@ class ProducerConfiguration {
ProducerConfiguration& setProducerName(const std::string& producerName);
const std::string& getProducerName() const;
+ /**
+ * Declare the schema of the data that will be published by this producer.
+ *
+ * The schema will be checked against the schema of the topic, and it
+ * will fail if it's not compatible, though the client library will
+ * not perform any validation that the actual message payload are
+ * conforming to the specified schema.
+ *
+ * For all purposes, this
+ * @param schemaInfo
+ * @return
+ */
+ ProducerConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+ /**
+ * @return the schema information declared for this producer
+ */
+ const SchemaInfo& getSchema() const;
+
ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
int getSendTimeout() const;
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 8d365ab..69776e3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -23,6 +23,7 @@
#include <boost/shared_ptr.hpp>
#include <pulsar/Result.h>
#include <pulsar/Message.h>
+#include <pulsar/Schema.h>
#pragma GCC visibility push(default)
namespace pulsar {
@@ -49,6 +50,21 @@ class ReaderConfiguration {
ReaderConfiguration& operator=(const ReaderConfiguration&);
/**
+ * Declare the schema of the data that this reader will be accepting.
+ *
+ * The schema will be checked against the schema of the topic, and the
+ * reader creation will fail if it's not compatible.
+ *
+ * @param schemaInfo the schema definition object
+ */
+ ReaderConfiguration& setSchema(const SchemaInfo& schemaInfo);
+
+ /**
+ * @return the schema information declared for this consumer
+ */
+ const SchemaInfo& getSchema() const;
+
+ /**
* A message listener enables your application to configure how to process
* messages. A listener will be called in order for every message received.
*/
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
new file mode 100644
index 0000000..eb2ebbb
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+
+#include <iosfwd>
+#include <boost/shared_ptr.hpp>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum SchemaType
+{
+ /**
+ * No schema defined
+ */
+ NONE = 0,
+
+ /**
+ * Simple String encoding with UTF-8
+ */
+ STRING = 1,
+
+ /**
+ * A 8-byte integer.
+ */
+ INT8 = 2,
+
+ /**
+ * A 16-byte integer.
+ */
+ INT16 = 3,
+
+ /**
+ * A 32-byte integer.
+ */
+ INT32 = 4,
+
+ /**
+ * A 64-byte integer.
+ */
+ INT64 = 5,
+
+ /**
+ * A float number.
+ */
+ FLOAT = 6,
+
+ /**
+ * A double number
+ */
+ DOUBLE = 7,
+
+ /**
+ * A bytes array.
+ */
+ BYTES = 8,
+
+ /**
+ * JSON object encoding and validation
+ */
+ JSON = 9,
+
+ /**
+ * Protobuf message encoding and decoding
+ */
+ PROTOBUF = 10,
+
+ /**
+ * Serialize and deserialize via Avro
+ */
+ AVRO = 11,
+
+ /**
+ * Auto Consume Type.
+ */
+ AUTO_CONSUME = 13,
+
+ /**
+ * Auto Publish Type.
+ */
+ AUTO_PUBLISH = 14,
+
+ /**
+ * A Schema that contains Key Schema and Value Schema.
+ */
+ KEY_VALUE = 15,
+};
+
+// Return string representation of result code
+const char *strSchemaType(SchemaType schemaType);
+
+class SchemaInfoImpl;
+
+typedef std::map<std::string, std::string> StringMap;
+
+/**
+ * Encapsulates data around the schema definition
+ */
+class SchemaInfo {
+ public:
+ SchemaInfo();
+
+ /**
+ * @param schemaType the schema type
+ * @param name the name of the schema definition
+ * @param schema the schema definition as a JSON string
+ * @param properties a map of custom defined properties attached to the schema
+ */
+ SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
+ const StringMap &properties = StringMap());
+
+ /**
+ * @return the schema type
+ */
+ SchemaType getSchemaType() const;
+
+ /**
+ * @return the name of the schema definition
+ */
+ const std::string &getName() const;
+
+ /**
+ * @return the schema definition as a JSON string
+ */
+ const std::string &getSchema() const;
+
+ /**
+ * @return a map of custom defined properties attached to the schema
+ */
+ const StringMap &getProperties() const;
+
+ private:
+ typedef boost::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
+ SchemaInfoImplPtr impl_;
+};
+
+} // namespace pulsar
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType);
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index fbda62b..0d8106f 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -688,7 +688,7 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();
- requestData.promise.setValue({"", -1});
+ requestData.promise.setValue({});
requestData.timer->cancel();
}
break;
@@ -853,8 +853,13 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();
- requestData.promise.setValue(
- {producerSuccess.producer_name(), producerSuccess.last_sequence_id()});
+ ResponseData data;
+ data.producerName = producerSuccess.producer_name();
+ data.lastSequenceId = producerSuccess.last_sequence_id();
+ if (producerSuccess.has_schema_version()) {
+ data.schemaVersion = producerSuccess.schema_version();
+ }
+ requestData.promise.setValue(data);
requestData.timer->cancel();
}
break;
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index ac8d6c5..5b47fb5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -68,7 +68,12 @@ class LookupDataResult;
struct OpSendMsg;
-typedef std::pair<std::string, int64_t> ResponseData;
+// Data returned on the request operation. Mostly used on create-producer command
+struct ResponseData {
+ std::string producerName;
+ int64_t lastSequenceId;
+ std::string schemaVersion;
+};
typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index b048dce..4c80154 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -25,6 +25,7 @@
#include "PulsarApi.pb.h"
#include "Utils.h"
#include "Url.h"
+#include <pulsar/Schema.h>
#include "checksum/ChecksumProvider.h"
#include <algorithm>
#include <boost/thread/mutex.hpp>
@@ -36,6 +37,51 @@ using namespace pulsar::proto;
DECLARE_LOG_OBJECT();
+static inline bool isBuiltInSchema(SchemaType schemaType) {
+ switch (schemaType) {
+ case STRING:
+ case JSON:
+ case AVRO:
+ case PROTOBUF:
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+static inline proto::Schema_Type getSchemaType(SchemaType type) {
+ switch (type) {
+ case SchemaType::NONE:
+ return Schema_Type_None;
+ case STRING:
+ return Schema_Type_String;
+ case JSON:
+ return Schema_Type_Json;
+ case PROTOBUF:
+ return Schema_Type_Protobuf;
+ case AVRO:
+ return Schema_Type_Avro;
+ default:
+ return Schema_Type_None;
+ }
+}
+
+static proto::Schema* getSchema(const SchemaInfo& schemaInfo) {
+ proto::Schema* schema = proto::Schema().New();
+ schema->set_name(schemaInfo.getName());
+ schema->set_schema_data(schemaInfo.getSchema());
+ schema->set_type(getSchemaType(schemaInfo.getSchemaType()));
+ for (const auto& kv : schemaInfo.getProperties()) {
+ proto::KeyValue* keyValue = proto::KeyValue().New();
+ keyValue->set_key(kv.first);
+ keyValue->set_value(kv.second);
+ schema->mutable_properties()->AddAllocated(keyValue);
+ }
+
+ return schema;
+}
+
SharedBuffer Commands::writeMessageWithSize(const BaseCommand& cmd) {
size_t cmdSize = cmd.ByteSize();
size_t frameSize = 4 + cmdSize;
@@ -189,7 +235,8 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType,
const std::string& consumerName, SubscriptionMode subscriptionMode,
Optional<MessageId> startMessageId, bool readCompacted,
- const std::map<std::string, std::string>& metadata) {
+ const std::map<std::string, std::string>& metadata,
+ const SchemaInfo& schemaInfo) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -201,6 +248,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_consumer_name(consumerName);
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
subscribe->set_read_compacted(readCompacted);
+
+ if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+ subscribe->set_allocated_schema(getSchema(schemaInfo));
+ }
+
if (startMessageId.is_present()) {
MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
messageIdData.set_ledgerid(startMessageId.value().ledgerId());
@@ -233,7 +285,8 @@ SharedBuffer Commands::newUnsubscribe(uint64_t consumerId, uint64_t requestId) {
SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId,
- const std::map<std::string, std::string>& metadata) {
+ const std::map<std::string, std::string>& metadata,
+ const SchemaInfo& schemaInfo) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
@@ -248,6 +301,10 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->mutable_metadata()->AddAllocated(keyValue);
}
+ if (isBuiltInSchema(schemaInfo.getSchemaType())) {
+ producer->set_allocated_schema(getSchema(schemaInfo));
+ }
+
if (!producerName.empty()) {
producer->set_producer_name(producerName);
}
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index e669953..15611a8 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -21,6 +21,7 @@
#include <pulsar/Authentication.h>
#include <pulsar/Message.h>
+#include <pulsar/Schema.h>
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
@@ -78,13 +79,15 @@ class Commands {
uint64_t consumerId, uint64_t requestId,
proto::CommandSubscribe_SubType subType, const std::string& consumerName,
SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId,
- bool readCompacted, const std::map<std::string, std::string>& metadata);
+ bool readCompacted, const std::map<std::string, std::string>& metadata,
+ const SchemaInfo& schemaInfo);
static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);
static SharedBuffer newProducer(const std::string& topic, uint64_t producerId,
const std::string& producerName, uint64_t requestId,
- const std::map<std::string, std::string>& metadata);
+ const std::map<std::string, std::string>& metadata,
+ const SchemaInfo& schemaInfo);
static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 4014ad2..4a7e73a 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -33,6 +33,13 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat
return *this;
}
+ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+ impl_->schemaInfo = schemaInfo;
+ return *this;
+}
+
+const SchemaInfo& ConsumerConfiguration::getSchema() const { return impl_->schemaInfo; }
+
long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const {
return impl_->brokerConsumerStatsCacheTimeInMs;
}
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 16e91c8..bf9ceb5 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -24,6 +24,7 @@
namespace pulsar {
struct ConsumerConfigurationImpl {
+ SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
ConsumerType consumerType;
MessageListener messageListener;
@@ -38,7 +39,8 @@ struct ConsumerConfigurationImpl {
int patternAutoDiscoveryPeriod;
std::map<std::string, std::string> properties;
ConsumerConfigurationImpl()
- : unAckedMessagesTimeoutMs(0),
+ : schemaInfo(),
+ unAckedMessagesTimeoutMs(0),
consumerType(ConsumerExclusive),
messageListener(),
hasMessageListener(false),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 62446d9..35a18db 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -137,9 +137,9 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
- SharedBuffer cmd =
- Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_,
- subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties());
+ SharedBuffer cmd = Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(),
+ consumerName_, subscriptionMode_, startMessageId_,
+ readCompacted_, config_.getProperties(), config_.getSchema());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, _1));
}
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 0f2780e..bd27b0e 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -147,6 +147,12 @@ const std::string& PartitionedProducerImpl::getProducerName() const {
return producers_[0]->getProducerName();
}
+const std::string& PartitionedProducerImpl::getSchemaVersion() const {
+ // Since the schema is atomically assigned on the partitioned-topic,
+ // it's guaranteed that all the partitions will have the same schema version.
+ return producers_[0]->getSchemaVersion();
+}
+
int64_t PartitionedProducerImpl::getLastSequenceId() const {
int64_t currentMax = -1L;
for (int i = 0; i < producers_.size(); i++) {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 0a3d949..283d273 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -59,6 +59,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
virtual int64_t getLastSequenceId() const;
+ virtual const std::string& getSchemaVersion() const;
+
virtual void start();
virtual void shutdown();
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index bee213e..1659a21 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -60,6 +60,8 @@ const std::string& Producer::getProducerName() const { return impl_->getProducer
int64_t Producer::getLastSequenceId() const { return impl_->getLastSequenceId(); }
+const std::string& Producer::getSchemaVersion() const { return impl_->getSchemaVersion(); }
+
Result Producer::close() {
Promise<bool, Result> promise;
closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 9ad2cf9..a216fc1 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -187,6 +187,13 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
return *this;
}
+ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+ impl_->schemaInfo = schemaInfo;
+ return *this;
+}
+
+const SchemaInfo& ProducerConfiguration::getSchema() const { return impl_->schemaInfo; }
+
bool ProducerConfiguration::hasProperty(const std::string& name) const {
const std::map<std::string, std::string>& m = impl_->properties;
return m.find(name) != m.end();
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index cc5b9b7..7ec09c8 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -27,6 +27,7 @@
namespace pulsar {
struct ProducerConfigurationImpl {
+ SchemaInfo schemaInfo;
Optional<std::string> producerName;
Optional<int64_t> initialSequenceId;
int sendTimeoutMs;
@@ -46,7 +47,8 @@ struct ProducerConfigurationImpl {
ProducerCryptoFailureAction cryptoFailureAction;
std::map<std::string, std::string> properties;
ProducerConfigurationImpl()
- : sendTimeoutMs(30000),
+ : schemaInfo(),
+ sendTimeoutMs(30000),
compressionType(CompressionNone),
maxPendingMessages(1000),
maxPendingMessagesAcrossPartitions(50000),
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 29d081b..4c5c56d 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -102,6 +102,8 @@ const std::string& ProducerImpl::getProducerName() const { return producerName_;
int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; }
+const std::string& ProducerImpl::getSchemaVersion() const { return schemaVersion_; }
+
void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -127,8 +129,8 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd =
- Commands::newProducer(topic_, producerId_, producerName_, requestId, conf_.getProperties());
+ SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
+ conf_.getProperties(), conf_.getSchema());
cnx->sendRequestWithId(cmd, requestId)
.addListener(boost::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx, _1, _2));
}
@@ -150,20 +152,19 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
if (result == ResultOk) {
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
- const std::string& producerName = responseData.first;
- int64_t lastSequenceId = responseData.second;
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
Lock lock(mutex_);
cnx->registerProducer(producerId_, shared_from_this());
- producerName_ = producerName;
+ producerName_ = responseData.producerName;
+ schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
if (batchMessageContainer) {
batchMessageContainer->producerName_ = producerName_;
}
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
- lastSequenceIdPublished_ = lastSequenceId;
+ lastSequenceIdPublished_ = responseData.lastSequenceId;
msgSequenceGenerator_ = lastSequenceIdPublished_ + 1;
}
resendMessages(cnx);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index aa22abc..00f7f31 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -81,6 +81,8 @@ class ProducerImpl : public HandlerBase,
int64_t getLastSequenceId() const;
+ const std::string& getSchemaVersion() const;
+
uint64_t getProducerId() const;
virtual void start();
@@ -147,6 +149,7 @@ class ProducerImpl : public HandlerBase,
BatchMessageContainerPtr batchMessageContainer;
volatile int64_t lastSequenceIdPublished_;
+ std::string schemaVersion_;
typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
TimerPtr sendTimer_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h
index 3dd92a4..25b4db3 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -34,6 +34,7 @@ class ProducerImplBase {
virtual const std::string& getProducerName() const = 0;
virtual int64_t getLastSequenceId() const = 0;
+ virtual const std::string& getSchemaVersion() const = 0;
virtual void sendAsync(const Message& msg, SendCallback callback) = 0;
virtual void closeAsync(CloseCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index a649b33..1f13c02 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -31,6 +31,13 @@ ReaderConfiguration& ReaderConfiguration::operator=(const ReaderConfiguration& x
return *this;
}
+ReaderConfiguration& ReaderConfiguration::setSchema(const SchemaInfo& schemaInfo) {
+ impl_->schemaInfo = schemaInfo;
+ return *this;
+}
+
+const SchemaInfo& ReaderConfiguration::getSchema() const { return impl_->schemaInfo; }
+
ReaderConfiguration& ReaderConfiguration::setReaderListener(ReaderListener readerListener) {
impl_->readerListener = readerListener;
impl_->hasReaderListener = true;
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 5dca8e3..73082ea 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -24,6 +24,7 @@
namespace pulsar {
struct ReaderConfigurationImpl {
+ SchemaInfo schemaInfo;
ReaderListener readerListener;
bool hasReaderListener;
int receiverQueueSize;
@@ -31,7 +32,8 @@ struct ReaderConfigurationImpl {
std::string subscriptionRolePrefix;
bool readCompacted;
ReaderConfigurationImpl()
- : hasReaderListener(false),
+ : schemaInfo(),
+ hasReaderListener(false),
receiverQueueSize(1000),
readerName(),
subscriptionRolePrefix(),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 9507a0b..ce2ef21 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -33,6 +33,7 @@ void ReaderImpl::start(const MessageId& startMessageId) {
consumerConf.setConsumerType(ConsumerExclusive);
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
consumerConf.setReadCompacted(readerConf_.isReadCompacted());
+ consumerConf.setSchema(readerConf_.getSchema());
if (readerConf_.getReaderName().length() > 0) {
consumerConf.setConsumerName(readerConf_.getReaderName());
diff --git a/pulsar-client-cpp/lib/Schema.cc b/pulsar-client-cpp/lib/Schema.cc
new file mode 100644
index 0000000..4877369
--- /dev/null
+++ b/pulsar-client-cpp/lib/Schema.cc
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Schema.h>
+
+#include <iostream>
+#include <map>
+#include <boost/make_shared.hpp>
+#include <include/pulsar/Schema.h>
+
+#pragma GCC visibility push(default)
+
+std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType) {
+ return s << strSchemaType(schemaType);
+}
+
+namespace pulsar {
+
+const char *strSchemaType(SchemaType schemaType) {
+ switch (schemaType) {
+ case NONE:
+ return "NONE";
+ case STRING:
+ return "STRING";
+ case INT8:
+ return "INT8";
+ case INT16:
+ return "INT16";
+ case INT32:
+ return "INT32";
+ case INT64:
+ return "INT64";
+ case FLOAT:
+ return "FLOAT";
+ case DOUBLE:
+ return "DOUBLE";
+ case BYTES:
+ return "BYTES";
+ case JSON:
+ return "JSON";
+ case PROTOBUF:
+ return "PROTOBUF";
+ case AVRO:
+ return "AVRO";
+ case AUTO_CONSUME:
+ return "AUTO_CONSUME";
+ case AUTO_PUBLISH:
+ return "AUTO_PUBLISH";
+ case KEY_VALUE:
+ return "KEY_VALUE";
+ };
+ // NOTE : Do not add default case in the switch above. In future if we get new cases for
+ // Schema and miss them in the switch above we would like to get notified. Adding
+ // return here to make the compiler happy.
+ return "UnknownSchemaType";
+}
+
+class SchemaInfoImpl {
+ public:
+ const std::string name_;
+ const std::string schema_;
+ const SchemaType type_;
+ const std::map<std::string, std::string> properties_;
+
+ SchemaInfoImpl() : name_("BYTES"), schema_(), type_(BYTES), properties_() {}
+
+ SchemaInfoImpl(SchemaType schemaType, const std::string &name, const std::string &schema,
+ const StringMap &properties)
+ : type_(schemaType), name_(name), schema_(schema), properties_(properties) {}
+};
+
+SchemaInfo::SchemaInfo() : impl_(boost::make_shared<SchemaInfoImpl>()) {}
+
+SchemaInfo::SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
+ const StringMap &properties)
+ : impl_(boost::make_shared<SchemaInfoImpl>(schemaType, name, schema, properties)) {}
+
+SchemaType SchemaInfo::getSchemaType() const { return impl_->type_; }
+
+const std::string &SchemaInfo::getName() const { return impl_->name_; }
+
+const std::string &SchemaInfo::getSchema() const { return impl_->schema_; }
+
+const std::map<std::string, std::string> &SchemaInfo::getProperties() const { return impl_->properties_; }
+
+} // namespace pulsar
+
+#pragma GCC visibility pop
diff --git a/pulsar-client-cpp/tests/SchemaTest.cc b/pulsar-client-cpp/tests/SchemaTest.cc
new file mode 100644
index 0000000..7ad7b9e
--- /dev/null
+++ b/pulsar-client-cpp/tests/SchemaTest.cc
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+static const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+ "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+TEST(SchemaTest, testSchema) {
+ ClientConfiguration config;
+ Client client(lookupUrl);
+ Result res;
+
+ Producer producer;
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ res = client.createProducer("topic-avro", producerConf, producer);
+ producer.close();
+
+ ASSERT_EQ(ResultOk, res);
+
+ // Creating producer with no schema on same topic should fail
+ producerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+ res = client.createProducer("topic-avro", producerConf, producer);
+ ASSERT_EQ(ResultIncompatibleSchema, res);
+
+ // Creating producer with no schema on same topic should succeed
+ // because standalone broker is configured by default to not
+ // require the schema to be set
+ res = client.createProducer("topic-avro", producer);
+ ASSERT_EQ(ResultOk, res);
+
+ ConsumerConfiguration consumerConf;
+ Consumer consumer;
+ // Subscribing with no schema will still succeed
+ res = client.subscribe("topic-avro", "sub-1", consumerConf, consumer);
+ ASSERT_EQ(ResultOk, res);
+
+ // Subscribing with same Avro schema will succeed
+ consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+ res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+ ASSERT_EQ(ResultOk, res);
+
+ // Subscribing with different schema type will fail
+ consumerConf.setSchema(SchemaInfo(JSON, "Json", "{}"));
+ res = client.subscribe("topic-avro", "sub-2", consumerConf, consumer);
+ ASSERT_EQ(ResultIncompatibleSchema, res);
+}