You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/16 19:26:23 UTC

[GitHub] [pulsar] dlg99 opened a new pull request, #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

dlg99 opened a new pull request, #16098:
URL: https://github.com/apache/pulsar/pull/16098

   ### Motivation
   
   Record's getRecordSequence() returns non-unique sequenceId for the messages from the same batch.
   The root cause is that `FunctionCommon.getSequenceId()` does not account for the index of the message in the batch and only uses ledgerid and entryId.
   
   For the KCA Sink it mean that messages start arriving with the same offset, and some Kafka Sinks will ignore such messages as duplicates.
   
   Changing this behavior in the `FunctionCommon.getSequenceId()` is potentially breaking (requires separate discussion) and I assume it does not affect Pulsar right now. 
   Another problem is that we are already packing two longs (ledgerId and entryId) to get one long (sequenceId), batch adds an int to this. With the KCA one can make assumptions around batch size/number of entries in ledger before rotation and configure this to avoid/minimize lossiness of this packing, in general Pulsar such assumption aren'tr eliable.
   
   ### Modifications
   
   KCA's produced kafka offset can use 
   * message index as an offset, if enabled and added by interceptor
   * use message index in the batch to build the offset
   
   PulsarKafkaConnectSinkConfig added couple of parameters (documented in FieldDoc)
   * maxBatchBitsForOffset
   * useIndexAsOffset
   
   ### Verifying this change
   
   Added unit tests
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
   No
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
     - The rest endpoints: (yes / no)
     - The admin cli options: (yes / no)
     - Anything that affects deployment: (yes / no / don't know)
   
   There are no Sinks in the Apache Pulsar that use KCA.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [x] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli merged pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
eolivelli merged PR #16098:
URL: https://github.com/apache/pulsar/pull/16098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16098:
URL: https://github.com/apache/pulsar/pull/16098#discussion_r899498331


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+    private boolean useIndexAsOffset = true;
+
+    @FieldDoc(
+            defaultValue = "12",
+            help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+                    + "0 to disable this behavior (Messages from the same batch will have the same "
+                    + "offset which can affect some connectors.)")
+    private int maxBatchBitsForOffset = 12;

Review Comment:
   Maybe if we use Integer we can detect that the connector has been created with a old version if the value is null and so enable the legacy behaviour



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #16098:
URL: https://github.com/apache/pulsar/pull/16098#discussion_r899874491


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")

Review Comment:
   the config is `exposingBrokerEntryMetadataToClientEnabled`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16098:
URL: https://github.com/apache/pulsar/pull/16098#discussion_r900270900


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")

Review Comment:
   will update in https://github.com/apache/pulsar/pull/16100 since this PR is merged already



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16098:
URL: https://github.com/apache/pulsar/pull/16098#discussion_r899494308


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+    private boolean useIndexAsOffset = true;
+
+    @FieldDoc(
+            defaultValue = "12",
+            help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+                    + "0 to disable this behavior (Messages from the same batch will have the same "
+                    + "offset which can affect some connectors.)")
+    private int maxBatchBitsForOffset = 12;

Review Comment:
   If this is zero than the behaviour is the same as before.
   So I would keep 0 as default, in order to not break compatibility with data (offsets) already stored in existing envs that are upgrading



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] dlg99 commented on a diff in pull request #16098: [Fix][pulsar-io] KCA to use index (if available) instead of sequenceId and to handle batched messages non-unique sequenceIds

Posted by GitBox <gi...@apache.org>.
dlg99 commented on code in PR #16098:
URL: https://github.com/apache/pulsar/pull/16098#discussion_r899505971


##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -73,6 +73,20 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
             help = "In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.")
     private boolean unwrapKeyValueIfAvailable = true;
 
+    @FieldDoc(
+            defaultValue = "true",
+            help = "Allows use of message index instead of message sequenceId as offset, if available.\n"
+                    + "Requires AppendIndexMetadataInterceptor and "
+                    + "enableExposingBrokerEntryMetadataToClient=true on brokers.")
+    private boolean useIndexAsOffset = true;
+
+    @FieldDoc(
+            defaultValue = "12",
+            help = "Number of bits (0 to 20) to use for index of message in the batch for translation into an offset.\n"
+                    + "0 to disable this behavior (Messages from the same batch will have the same "
+                    + "offset which can affect some connectors.)")
+    private int maxBatchBitsForOffset = 12;

Review Comment:
   Apache Pulsar does not have Sink implementations that use kafka connectors (Debezium etc are sources).
   I don't think we need to worry about legacy behavior here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org