You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/01 12:37:20 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #7416: Decompression payload if needed in KeyShared subscription

codelipenghui opened a new pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416


   
   Fixes #7414
   
   ### Motivation
   
   Decompression payload if needed in KeyShared subscription
   
   ### Verifying this change
   
   Unit test added
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-653963885


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448350653



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = metadata.getCompression();
+                CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       A better way would be to ensure that producers (in every lang) are attaching the routing key on the message metadata protobuf (at the batch level). That is not compressed so we should always be safe. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] feeblefakie commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
feeblefakie commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-652396867


   @codelipenghui Thank you ! 🙇‍♂️


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448339719



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = metadata.getCompression();
+                CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       We should avoid decompressing in the broker in any case. This is very dangerous from a CPU usage point of view.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-657893897


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-657906590


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-653074738


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-652942049


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] feeblefakie commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
feeblefakie commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-652395111


   @codelipenghui Oh, great. it is pretty fast.
   Was it related to the changes in between 2.5.0 and 2.6.0 ?
   Just asking because it doesn’t look so.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448835008



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = metadata.getCompression();
+                CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       @merlimat I have addressed your comment, please take a look, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-652902209


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#issuecomment-652396037


   @feeblefakie it's introduced by #7107. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448363006



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = metadata.getCompression();
+                CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       Make sense, so it's better to revert #7107 and other lang clients should attach the routing key on the message metadata.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7416: Decompression payload if needed in KeyShared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7416:
URL: https://github.com/apache/pulsar/pull/7416#discussion_r448344535



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
##########
@@ -153,13 +155,18 @@ public void resetCloseFuture() {
         PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         try {
-            if (metadata.hasNumMessagesInBatch()) {
+            if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch()) {
                 // If the message was part of a batch (eg: a batch of 1 message), we need
                 // to read the key from the first single-message-metadata entry
+                PulsarApi.CompressionType compressionType = metadata.getCompression();
+                CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);

Review comment:
       I'm not sure how to handle #7107 that fixed. Maybe we can use the first single message key as the batch message key, but this requires all clients catchup.
   
   So, I think we can onboard this PR first ensure correctness, and I will create an issue for the clients catchup work.
   
   And I have add check `if (!metadata.hasOrderingKey() && !metadata.hasPartitionKey() && metadata.hasNumMessagesInBatch())`, if the clients set the key of the batch message to the key of the first message, it can reduce overhead.
   
   I have test with the KeyBasedBatcher. It already set the batch message key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org