You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/24 10:43:39 UTC

[GitHub] [pulsar] RobertIndie opened a new pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

RobertIndie opened a new pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171


   ### Motivation
   
   Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.
   
   
   Here is the simple code used to demonstrate the problem.
   
   ```java
   
   var msgId = producer.send(...); // eg. return 0:1:-1
   
   var otherMsg = producer.send(...); // return 0:2:-1
   
   consumer.seek(msgId); // inclusive seek
   
   var receiveMsgId = consumer.receive().getMessageId(); // it may skip the
   first message and return like 0:2:-1
   
   Assert.assertEquals(msgId, receiveMsgId); // fail
   
   ```
   ### Modifications
   
   When the producer sends a chunked message, the message-id of the first chunk is stored temporarily in the producer until all chunks have been sent, and then it is returned to the user after all chunks has been sent successfully.
   
   ### Verifying this change
   
   
   This PR is still working in progress, still need to add some tests.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (yes) It affects the behavior of sending chunked messages.
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   - [x] no-need-doc 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on a change in pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171#discussion_r717453719



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1000,6 +1004,19 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
         OpSendMsg finalOp = op;
         LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
         op.setMessageId(ledgerId, entryId, partitionIndex);
+        if (op.totalChunks > 1) {
+            if (op.chunkId == 0) {
+                chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(),
+                        new MessageIdImpl(ledgerId, entryId, partitionIndex));
+            } else if (op.chunkId == op.totalChunks - 1) {
+                MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid());
+                if (firstChunkMsgId != null) {
+                    op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex);

Review comment:
       It is correct. But I think we don't need to reflect any change to the message on the server-side. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171#discussion_r717152592



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1000,6 +1004,19 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
         OpSendMsg finalOp = op;
         LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
         op.setMessageId(ledgerId, entryId, partitionIndex);
+        if (op.totalChunks > 1) {
+            if (op.chunkId == 0) {
+                chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(),
+                        new MessageIdImpl(ledgerId, entryId, partitionIndex));
+            } else if (op.chunkId == op.totalChunks - 1) {
+                MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid());
+                if (firstChunkMsgId != null) {
+                    op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex);

Review comment:
       this is the challenge here. last chunk is already published and persist at server side. this is at `ackReceived` method,  means you have received the ack for already published message and updating messageId will not reflect any change to message which is already persist at server side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie closed pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie closed pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on a change in pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171#discussion_r717453719



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1000,6 +1004,19 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
         OpSendMsg finalOp = op;
         LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
         op.setMessageId(ledgerId, entryId, partitionIndex);
+        if (op.totalChunks > 1) {
+            if (op.chunkId == 0) {
+                chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(),
+                        new MessageIdImpl(ledgerId, entryId, partitionIndex));
+            } else if (op.chunkId == op.totalChunks - 1) {
+                MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid());
+                if (firstChunkMsgId != null) {
+                    op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex);

Review comment:
       It is correct. But I think we don't need to reflect any change to the message on the server-side. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] rdhabalia commented on a change in pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
rdhabalia commented on a change in pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171#discussion_r717152592



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
##########
@@ -1000,6 +1004,19 @@ void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long le
         OpSendMsg finalOp = op;
         LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, getHighestSequenceId(finalOp)));
         op.setMessageId(ledgerId, entryId, partitionIndex);
+        if (op.totalChunks > 1) {
+            if (op.chunkId == 0) {
+                chunkMessageIds.put(op.msg.getMessageBuilder().getUuid(),
+                        new MessageIdImpl(ledgerId, entryId, partitionIndex));
+            } else if (op.chunkId == op.totalChunks - 1) {
+                MessageIdImpl firstChunkMsgId = chunkMessageIds.get(op.msg.getMessageBuilder().getUuid());
+                if (firstChunkMsgId != null) {
+                    op.setMessageId(firstChunkMsgId.ledgerId, firstChunkMsgId.entryId, firstChunkMsgId.partitionIndex);

Review comment:
       this is the challenge here. last chunk is already published and persist at server side. this is at `ackReceived` method,  means you have received the ack for already published message and updating messageId will not reflect any change to message which is already persist at server side.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on pull request #12171: [WIP][Client] Return the message ID of the first chunk when sending chunked messages

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on pull request #12171:
URL: https://github.com/apache/pulsar/pull/12171#issuecomment-1002377022


   This has been fixed in https://github.com/apache/pulsar/pull/12403.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org