You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/14 13:04:37 UTC

[GitHub] [flink] syhily opened a new pull request, #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

syhily opened a new pull request, #21069:
URL: https://github.com/apache/flink/pull/21069

   ## What is the purpose of the change
   
   Pulsar did a wrong batch message size assertion in code. We require every batch message should have size 1. But sometimes the message size could be 0 due to the serialization and deserialization. So we should change the assertion logic.
   
   ## Brief change log
   
   Change the implementation in `MessageIdUtils.unwrapMessageId(MessageId)`.
   
   ## Verifying this change
   
   This change is a trivial fix without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun merged pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
tisonkun merged PR #21069:
URL: https://github.com/apache/flink/pull/21069


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21069:
URL: https://github.com/apache/flink/pull/21069#discussion_r998016267


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java:
##########
@@ -55,17 +59,15 @@ public static MessageId nextMessageId(MessageId messageId) {
      * message id. We don't support the batch message for its low performance now.
      */
     public static MessageIdImpl unwrapMessageId(MessageId messageId) {
-        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        MessageIdImpl idImpl = convertToMessageIdImpl(messageId);
         if (idImpl instanceof BatchMessageIdImpl) {
             int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
-            checkArgument(batchSize == 1, "We only support normal message id currently.");
+            checkArgument(
+                    batchSize <= 1,

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #21069:
URL: https://github.com/apache/flink/pull/21069#issuecomment-1278989774

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "7d89bb64116b99d895edf1df5cba443715b1f710",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "7d89bb64116b99d895edf1df5cba443715b1f710",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 7d89bb64116b99d895edf1df5cba443715b1f710 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on a diff in pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
syhily commented on code in PR #21069:
URL: https://github.com/apache/flink/pull/21069#discussion_r998045019


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java:
##########
@@ -55,17 +59,15 @@ public static MessageId nextMessageId(MessageId messageId) {
      * message id. We don't support the batch message for its low performance now.
      */
     public static MessageIdImpl unwrapMessageId(MessageId messageId) {
-        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        MessageIdImpl idImpl = convertToMessageIdImpl(messageId);
         if (idImpl instanceof BatchMessageIdImpl) {
             int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
-            checkArgument(batchSize == 1, "We only support normal message id currently.");
+            checkArgument(
+                    batchSize <= 1,

Review Comment:
   We can't promise the batch size must be 0 or 1. Since this should be a mistake from Pulsar side that they return a `BatchMessageId` instead of `MessageId` for normal consuming. We should assume that batch size should below or equal to 1.



##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java:
##########
@@ -55,17 +59,15 @@ public static MessageId nextMessageId(MessageId messageId) {
      * message id. We don't support the batch message for its low performance now.
      */
     public static MessageIdImpl unwrapMessageId(MessageId messageId) {
-        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        MessageIdImpl idImpl = convertToMessageIdImpl(messageId);
         if (idImpl instanceof BatchMessageIdImpl) {
             int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
-            checkArgument(batchSize == 1, "We only support normal message id currently.");
+            checkArgument(
+                    batchSize <= 1,

Review Comment:
   Yep.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] syhily commented on pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
syhily commented on PR #21069:
URL: https://github.com/apache/flink/pull/21069#issuecomment-1281843417

   The main cause of this fix is just because some bugs in Pulsar. Pulsar always wrap single message id into a batch message id. This fix will throws detailed exception with batch message size.
   
   I don't think we need to test on this wired situation? This is only a guarding fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] tisonkun commented on a diff in pull request #21069: [FLINK-29613][Connector/Pulsar] Fix wrong batch size assertion.

Posted by GitBox <gi...@apache.org>.
tisonkun commented on code in PR #21069:
URL: https://github.com/apache/flink/pull/21069#discussion_r997997897


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/MessageIdUtils.java:
##########
@@ -55,17 +59,15 @@ public static MessageId nextMessageId(MessageId messageId) {
      * message id. We don't support the batch message for its low performance now.
      */
     public static MessageIdImpl unwrapMessageId(MessageId messageId) {
-        MessageIdImpl idImpl = MessageIdImpl.convertToMessageIdImpl(messageId);
+        MessageIdImpl idImpl = convertToMessageIdImpl(messageId);
         if (idImpl instanceof BatchMessageIdImpl) {
             int batchSize = ((BatchMessageIdImpl) idImpl).getBatchSize();
-            checkArgument(batchSize == 1, "We only support normal message id currently.");
+            checkArgument(
+                    batchSize <= 1,

Review Comment:
   OK. I get the point. So here we should use the condition: `batchSize == 1 || batchSize == 0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org