You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/26 03:03:05 UTC

[GitHub] [pulsar] coderzc opened a new pull request, #17289: [improve][client-c++]Support include message header size when check maxMessageSize

coderzc opened a new pull request, #17289:
URL: https://github.com/apache/pulsar/pull/17289

   Master Issue: #17188
   
   ### Motivation
   
   See: #17188
   
   ### Modifications
   
   Support include message header size when check maxMessageSize for cpp client
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   This change is already covered by existing tests, such as *testMaxMessageSize*、*testNoBatchMaxMessageSize*、*testChunkingMaxMessageSize*.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] RobertIndie merged pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
RobertIndie merged PR #17289:
URL: https://github.com/apache/pulsar/pull/17289


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r965006126


##########
pulsar-client-cpp/tests/ProducerTest.cc:
##########
@@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testNoBatchMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testChunkingMaxMessageSize) {

Review Comment:
   This is a test to enable Chunking, some cases different



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#issuecomment-1255058183

   @RobertIndie @merlimat @Demogorgon314 Could any of you leave a 2nd review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#issuecomment-1251924121

   @RobertIndie PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r962090388


##########
pulsar-client-cpp/lib/ProducerImpl.cc:
##########
@@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
                 return;
             }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, uncompressedSize});
+            OpSendMsg op =
+                OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
+                          producerId_, sequenceId,       conf_.getSendTimeout(),

Review Comment:
   The code format is a bit strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r957702648


##########
pulsar-client-cpp/tests/BasicEndToEndTest.cc:
##########
@@ -609,12 +609,6 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     result = producer.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
 
-    // Anything up to MaxMessageSize should be allowed
-    size = ClientConnection::getMaxMessageSize();
-    msg = MessageBuilder().setAllocatedContent(content, size).build();
-    result = producer.send(msg);
-    ASSERT_EQ(ResultOk, result);

Review Comment:
   Could you reduce the `size` to make it work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
coderzc commented on PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#issuecomment-1229679613

   > Odd, why is the java client compilation failed? https://github.com/apache/pulsar/runs/8033665923?check_suite_focus=true
   > 
   > ```
   > Error:  Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.10.1:compile (default-compile) on project pulsar-client-original: Compilation failure
   > Error:  /Users/runner/work/pulsar/pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:[222,39] no suitable method found for sendMessage(long,long,int,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf)
   > Error:      method org.apache.pulsar.client.impl.ProducerImpl.sendMessage(long,long,int,org.apache.pulsar.client.api.MessageId,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf) is not applicable
   > Error:        (actual and formal argument lists differ in length)
   > Error:      method org.apache.pulsar.client.impl.ProducerImpl.sendMessage(long,long,long,int,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf) is not applicable
   > ```
   
   See: #17300 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#issuecomment-1229651663

   Odd, why is the java client compilation failed?
   https://github.com/apache/pulsar/runs/8033665923?check_suite_focus=true
   ```
   Error:  Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.10.1:compile (default-compile) on project pulsar-client-original: Compilation failure
   Error:  /Users/runner/work/pulsar/pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:[222,39] no suitable method found for sendMessage(long,long,int,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf)
   Error:      method org.apache.pulsar.client.impl.ProducerImpl.sendMessage(long,long,int,org.apache.pulsar.client.api.MessageId,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf) is not applicable
   Error:        (actual and formal argument lists differ in length)
   Error:      method org.apache.pulsar.client.impl.ProducerImpl.sendMessage(long,long,long,int,org.apache.pulsar.common.api.proto.MessageMetadata,io.netty.buffer.ByteBuf) is not applicable
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r964912014


##########
pulsar-client-cpp/lib/ProducerImpl.cc:
##########
@@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
                 return;
             }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, uncompressedSize});
+            OpSendMsg op =
+                OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
+                          producerId_, sequenceId,       conf_.getSendTimeout(),
+                          1,           uncompressedSize};
+
+            if (!chunkingEnabled_) {
+                const uint32_t msgMetadataSize = op.metadata_.ByteSize();
+                const uint32_t payloadSize = op.payload_.readableBytes();
+                const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + payloadSize;
+                if (msgHeadersAndPayloadSize > maxMessageSize) {

Review Comment:
   ```suggestion
                   if (msgHeadersAndPayloadSize > maxMessageSize) {
                       lock.unlock();
   ```
   
   We can unlock it since the lock is acquired only for `sendMessage`.



##########
pulsar-client-cpp/tests/ProducerTest.cc:
##########
@@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testNoBatchMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-NoBatchMaxMessageSize-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(false);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testChunkingMaxMessageSize) {

Review Comment:
   This test seems to be redundant because it's covered by the previous tests.



##########
pulsar-client-cpp/tests/ProducerTest.cc:
##########
@@ -210,3 +213,84 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testMaxMessageSize) {
+    Client client(serviceUrl);
+
+    const std::string topic = "ProducerTest-MaxMessageSize-" + std::to_string(time(nullptr));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    std::string msg = std::string(maxMessageSize / 2, 'a');
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(msg).build()));
+    Message message;
+    ASSERT_EQ(ResultOk, consumer.receive(message));
+    ASSERT_EQ(msg, message.getDataAsString());
+
+    std::string orderKey = std::string(maxMessageSize, 'a');
+    ASSERT_EQ(ResultMessageTooBig, producer.send(MessageBuilder().setOrderingKey(orderKey).build()));
+
+    ASSERT_EQ(ResultMessageTooBig,
+              producer.send(MessageBuilder().setContent(std::string(maxMessageSize, 'b')).build()));
+
+    client.close();
+}
+
+TEST(ProducerTest, testNoBatchMaxMessageSize) {

Review Comment:
   It's nearly the same with `testMaxMessageSize` except the difference of the producer configuration. You can simplify the test via `TEST_P`. See `MessageChunkingTest` for example. BTW, I think these tests could also be moved to `MessageChunkingTest`.



##########
pulsar-client-cpp/lib/ProducerImpl.cc:
##########
@@ -500,9 +506,26 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const SendCallba
                 return;
             }
 
-            sendMessage(OpSendMsg{msgMetadata, encryptedPayload,
-                                  (chunkId == totalChunks - 1) ? callback : nullptr, producerId_, sequenceId,
-                                  conf_.getSendTimeout(), 1, uncompressedSize});
+            OpSendMsg op =
+                OpSendMsg{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
+                          producerId_, sequenceId,       conf_.getSendTimeout(),
+                          1,           uncompressedSize};

Review Comment:
   ```c++
               OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == totalChunks - 1) ? callback : nullptr,
                            producerId_, sequenceId,       conf_.getSendTimeout(),
                            1,           uncompressedSize};
   ```
   
   Simplify the code. BTW, it can also reduce an invocation of the copy constructor, though the copy might be optimized due to the compiler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17289: [improve][client-c++]Support include message header size when check maxMessageSize

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17289:
URL: https://github.com/apache/pulsar/pull/17289#discussion_r957997193


##########
pulsar-client-cpp/tests/BasicEndToEndTest.cc:
##########
@@ -609,12 +609,6 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     result = producer.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
 
-    // Anything up to MaxMessageSize should be allowed
-    size = ClientConnection::getMaxMessageSize();
-    msg = MessageBuilder().setAllocatedContent(content, size).build();
-    result = producer.send(msg);
-    ASSERT_EQ(ResultOk, result);

Review Comment:
   Ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org