You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:49:45 UTC

[pulsar] branch master updated: Support max message size for cpp and go client (#4348)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ff23b4  Support max message size for cpp and go client (#4348)
0ff23b4 is described below

commit 0ff23b47bc1158dac03e6dbb3e1b335027ec577f
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Fri May 31 23:49:39 2019 +0800

    Support max message size for cpp and go client (#4348)
    
    * support max message size for cpp and go client
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * format cpp code
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix maxMessageSize logic
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix ci error
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 pulsar-client-cpp/lib/BatchMessageContainer.cc |  2 +-
 pulsar-client-cpp/lib/ClientConnection.cc      |  9 +++++++++
 pulsar-client-cpp/lib/ClientConnection.h       |  3 +++
 pulsar-client-cpp/lib/Commands.h               |  2 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc          | 16 +++++++++++-----
 pulsar-client-cpp/lib/ProducerImpl.cc          |  7 ++++---
 pulsar-client-cpp/lib/ProducerImpl.h           |  2 ++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   |  8 ++++----
 pulsar-client-go/go.mod                        |  1 -
 pulsar-client-go/go.sum                        |  2 --
 10 files changed, 35 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index eeceac7..9d7c481 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -102,7 +102,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
     producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
     impl_->payload = encryptedPayload;
 
-    if (impl_->payload.readableBytes() > Commands::MaxMessageSize) {
+    if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
         // At this point the compressed batch is above the overall MaxMessageSize. There
         // can only 1 single message in the batch at this point.
         batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 101b2c5..38dba90 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -131,6 +131,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
       operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
       authentication_(authentication),
       serverProtocolVersion_(ProtocolVersion_MIN),
+      maxMessageSize_(Commands::DefaultMaxMessageSize),
       executor_(executor),
       resolver_(executor->createTcpResolver()),
       socket_(executor->createSocket()),
@@ -224,6 +225,12 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
         return;
     }
 
+    if (cmdConnected.has_max_message_size()) {
+        LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size());
+        maxMessageSize_ = cmdConnected.max_message_size();
+        LOG_DEBUG("Current max message size is: " << maxMessageSize_);
+    }
+
     state_ = Ready;
     serverProtocolVersion_ = cmdConnected.protocol_version();
     connectPromise_.setValue(shared_from_this());
@@ -1366,6 +1373,8 @@ const std::string& ClientConnection::cnxString() const { return cnxString_; }
 
 int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }
 
+int ClientConnection::getMaxMessageSize() const { return maxMessageSize_; }
+
 Commands::ChecksumType ClientConnection::getChecksumType() const {
     return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
 }
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 02ee987..b16f8c6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -145,6 +145,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
 
     int getServerProtocolVersion() const;
 
+    int getMaxMessageSize() const;
+
     Commands::ChecksumType getChecksumType() const;
 
     Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId);
@@ -237,6 +239,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     TimeDuration operationsTimeout_;
     AuthenticationPtr authentication_;
     int serverProtocolVersion_;
+    int maxMessageSize_;
 
     ExecutorServicePtr executor_;
 
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 4952089..7428c8a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -49,7 +49,7 @@ class Commands {
     };
     enum WireFormatConstant
     {
-        MaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
+        DefaultMaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
         MaxFrameSize = (5 * 1024 * 1024)
     };
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 31df0eb..d268800 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -459,11 +459,17 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, con
 
     uint32_t uncompressedSize = metadata.uncompressed_size();
     uint32_t payloadSize = payload.readableBytes();
-    if (payloadSize > Commands::MaxMessageSize) {
-        // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
-        LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize  //
-                            << " at  " << msg.message_id().ledgerid() << ":" << msg.message_id().entryid());
-        discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+    if (cnx) {
+        if (payloadSize > cnx->getMaxMessageSize()) {
+            // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
+            LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize  //
+                                << " at  " << msg.message_id().ledgerid() << ":"
+                                << msg.message_id().entryid());
+            discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+            return false;
+        }
+    } else {
+        LOG_ERROR("Connection not ready for Consumer - " << getConsumerId());
         return false;
     }
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 5cf799b..b2e7848 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -155,6 +155,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
 
         Lock lock(mutex_);
+        keepMaxMessageSize_ = cnx->getMaxMessageSize();
         cnx->registerProducer(producerId_, shared_from_this());
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
@@ -338,7 +339,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
 
     uint32_t uncompressedSize = payload.readableBytes();
     uint32_t payloadSize = uncompressedSize;
-
+    ClientConnectionPtr cnx = getCnx().lock();
     if (!batchMessageContainer) {
         // If batching is enabled we compress all the payloads together before sending the batch
         payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
@@ -352,9 +353,9 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
         }
         payload = encryptedPayload;
 
-        if (payloadSize > Commands::MaxMessageSize) {
+        if (payloadSize > keepMaxMessageSize_) {
             LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
-                                << Commands::MaxMessageSize << " bytes");
+                                << keepMaxMessageSize_ << " bytes");
             cb(ResultMessageTooBig, msg);
             return;
         }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 5e84007..d7ac603 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -63,6 +63,8 @@ class ProducerImpl : public HandlerBase,
                  const ProducerConfiguration& producerConfiguration);
     ~ProducerImpl();
 
+    int keepMaxMessageSize_;
+
     virtual const std::string& getTopic() const;
 
     virtual void sendAsync(const Message& msg, SendCallback callback);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e666ecd..80ca4e6 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -561,14 +561,14 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     Result result = client.createProducer(topicName, conf, producer);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::MaxMessageSize + 1;
+    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
     char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
 
     // Anything up to MaxMessageSize should be allowed
-    size = Commands::MaxMessageSize;
+    size = Commands::DefaultMaxMessageSize;
     msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer.send(msg);
     ASSERT_EQ(ResultOk, result);
@@ -1114,7 +1114,7 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {
     result = producerFuture.get(producer2);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::MaxMessageSize + 1;
+    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
     char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer1.send(msg);
@@ -1165,7 +1165,7 @@ TEST(BasicEndToEndTest, testBigMessageSizeBatching) {
     result = client.createProducer(topicName, conf2, producer2);
     ASSERT_EQ(ResultOk, result);
 
-    int size = Commands::MaxMessageSize + 1;
+    int size = Commands::DefaultMaxMessageSize + 1000 * 100;
     char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer1.send(msg);
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
index ea75273..614cf14 100644
--- a/pulsar-client-go/go.mod
+++ b/pulsar-client-go/go.mod
@@ -2,7 +2,6 @@ module github.com/apache/pulsar/pulsar-client-go
 
 require (
 	github.com/BurntSushi/toml v0.3.1 // indirect
-	github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
 	github.com/davecgh/go-spew v1.1.1
 	github.com/gogo/protobuf v1.2.1
 	github.com/golang/protobuf v1.3.1
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
index b6177d8..159a642 100644
--- a/pulsar-client-go/go.sum
+++ b/pulsar-client-go/go.sum
@@ -1,7 +1,5 @@
 github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
-github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=