You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/31 15:49:45 UTC
[pulsar] branch master updated: Support max message size for cpp
and go client (#4348)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0ff23b4 Support max message size for cpp and go client (#4348)
0ff23b4 is described below
commit 0ff23b47bc1158dac03e6dbb3e1b335027ec577f
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Fri May 31 23:49:39 2019 +0800
Support max message size for cpp and go client (#4348)
* support max message size for cpp and go client
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* format cpp code
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix maxMessageSize logic
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix ci error
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
* fix comments
Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
pulsar-client-cpp/lib/BatchMessageContainer.cc | 2 +-
pulsar-client-cpp/lib/ClientConnection.cc | 9 +++++++++
pulsar-client-cpp/lib/ClientConnection.h | 3 +++
pulsar-client-cpp/lib/Commands.h | 2 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 16 +++++++++++-----
pulsar-client-cpp/lib/ProducerImpl.cc | 7 ++++---
pulsar-client-cpp/lib/ProducerImpl.h | 2 ++
pulsar-client-cpp/tests/BasicEndToEndTest.cc | 8 ++++----
pulsar-client-go/go.mod | 1 -
pulsar-client-go/go.sum | 2 --
10 files changed, 35 insertions(+), 17 deletions(-)
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index eeceac7..9d7c481 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -102,7 +102,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
impl_->payload = encryptedPayload;
- if (impl_->payload.readableBytes() > Commands::MaxMessageSize) {
+ if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
// At this point the compressed batch is above the overall MaxMessageSize. There
// can only 1 single message in the batch at this point.
batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 101b2c5..38dba90 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -131,6 +131,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(ProtocolVersion_MIN),
+ maxMessageSize_(Commands::DefaultMaxMessageSize),
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
@@ -224,6 +225,12 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
return;
}
+ if (cmdConnected.has_max_message_size()) {
+ LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size());
+ maxMessageSize_ = cmdConnected.max_message_size();
+ LOG_DEBUG("Current max message size is: " << maxMessageSize_);
+ }
+
state_ = Ready;
serverProtocolVersion_ = cmdConnected.protocol_version();
connectPromise_.setValue(shared_from_this());
@@ -1366,6 +1373,8 @@ const std::string& ClientConnection::cnxString() const { return cnxString_; }
int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }
+int ClientConnection::getMaxMessageSize() const { return maxMessageSize_; }
+
Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 02ee987..b16f8c6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -145,6 +145,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
int getServerProtocolVersion() const;
+ int getMaxMessageSize() const;
+
Commands::ChecksumType getChecksumType() const;
Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId);
@@ -237,6 +239,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
int serverProtocolVersion_;
+ int maxMessageSize_;
ExecutorServicePtr executor_;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 4952089..7428c8a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -49,7 +49,7 @@ class Commands {
};
enum WireFormatConstant
{
- MaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
+ DefaultMaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
MaxFrameSize = (5 * 1024 * 1024)
};
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 31df0eb..d268800 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -459,11 +459,17 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, con
uint32_t uncompressedSize = metadata.uncompressed_size();
uint32_t payloadSize = payload.readableBytes();
- if (payloadSize > Commands::MaxMessageSize) {
- // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
- LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize //
- << " at " << msg.message_id().ledgerid() << ":" << msg.message_id().entryid());
- discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+ if (cnx) {
+ if (payloadSize > cnx->getMaxMessageSize()) {
+ // Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
+ LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize //
+ << " at " << msg.message_id().ledgerid() << ":"
+ << msg.message_id().entryid());
+ discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
+ return false;
+ }
+ } else {
+ LOG_ERROR("Connection not ready for Consumer - " << getConsumerId());
return false;
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 5cf799b..b2e7848 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -155,6 +155,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
Lock lock(mutex_);
+ keepMaxMessageSize_ = cnx->getMaxMessageSize();
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
@@ -338,7 +339,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
uint32_t uncompressedSize = payload.readableBytes();
uint32_t payloadSize = uncompressedSize;
-
+ ClientConnectionPtr cnx = getCnx().lock();
if (!batchMessageContainer) {
// If batching is enabled we compress all the payloads together before sending the batch
payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
@@ -352,9 +353,9 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
}
payload = encryptedPayload;
- if (payloadSize > Commands::MaxMessageSize) {
+ if (payloadSize > keepMaxMessageSize_) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
- << Commands::MaxMessageSize << " bytes");
+ << keepMaxMessageSize_ << " bytes");
cb(ResultMessageTooBig, msg);
return;
}
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 5e84007..d7ac603 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -63,6 +63,8 @@ class ProducerImpl : public HandlerBase,
const ProducerConfiguration& producerConfiguration);
~ProducerImpl();
+ int keepMaxMessageSize_;
+
virtual const std::string& getTopic() const;
virtual void sendAsync(const Message& msg, SendCallback callback);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index e666ecd..80ca4e6 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -561,14 +561,14 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
Result result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);
- int size = Commands::MaxMessageSize + 1;
+ int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultMessageTooBig, result);
// Anything up to MaxMessageSize should be allowed
- size = Commands::MaxMessageSize;
+ size = Commands::DefaultMaxMessageSize;
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
@@ -1114,7 +1114,7 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {
result = producerFuture.get(producer2);
ASSERT_EQ(ResultOk, result);
- int size = Commands::MaxMessageSize + 1;
+ int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer1.send(msg);
@@ -1165,7 +1165,7 @@ TEST(BasicEndToEndTest, testBigMessageSizeBatching) {
result = client.createProducer(topicName, conf2, producer2);
ASSERT_EQ(ResultOk, result);
- int size = Commands::MaxMessageSize + 1;
+ int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer1.send(msg);
diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod
index ea75273..614cf14 100644
--- a/pulsar-client-go/go.mod
+++ b/pulsar-client-go/go.mod
@@ -2,7 +2,6 @@ module github.com/apache/pulsar/pulsar-client-go
require (
github.com/BurntSushi/toml v0.3.1 // indirect
- github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1
diff --git a/pulsar-client-go/go.sum b/pulsar-client-go/go.sum
index b6177d8..159a642 100644
--- a/pulsar-client-go/go.sum
+++ b/pulsar-client-go/go.sum
@@ -1,7 +1,5 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
-github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
-github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=