You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/01 18:36:03 UTC

[GitHub] [kafka] C0urante opened a new pull request, #12941: KAFKA-13709: Add docs for exactly-once support in Connect

C0urante opened a new pull request, #12941:
URL: https://github.com/apache/kafka/pull/12941

   [Jira](https://issues.apache.org/jira/browse/KAFKA-13709)
   
   Rebased on top of https://github.com/apache/kafka/pull/12938 to fix recently-introduced compilation failures on trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042947164


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
+
+    <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
+
+    <h6>Worker configuration</h6>
+
+    <p>For new Connect clusters, set the <code>exactly.once.source.support</code> property to <code>enabled</code> in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the <code>exactly.once.source.support</code> property should be set to <code>preparing</code>, and during the second, it should be set to <code>enabled</code>.</p>
+
+    <h6>ACL requirements</h6>
+
+    <p>With exactly-once source support enabled, the principal for each Connect worker will require these ACLs:</p>
+
+    <table class="data-table">
+        <thead>
+            <tr>
+                <th>Operation</th>
+                <th>Resource Type</th>
+                <th>Resource Name</th>
+                <th>Note</th>
+            </tr>
+        </thead>
+        <tbody>
+            <tr>
+                <td>Write</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>IdempotentWrite</td>
+                <td>Cluster</td>
+                <td>ID of the Kafka cluster that hosts the worker's config topic</td>
+                <td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td>
+            </tr>
+        </tbody>
+    </table>
+
+    <p>And the principal for each individual connector will require these ACLs:</p>
+
+    <table class="data-table">
+        <thead>
+            <tr>
+                <th>Operation</th>
+                <th>Resource Type</th>
+                <th>Resource Name</th>
+                <th>Note</th>
+            </tr>
+        </thead>
+        <tbody>
+            <tr>
+                <td>Write</td>
+                <td>TransactionalId</td>
+                <td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td>
+                <td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>TransactionalId</td>
+                <td><code>${groupId}-${connector}-${taskId}</code>, for each task that the connector will create, where <code>${groupId}</code> is the <code>group.id</code> of the Connect cluster, <code>${connector}</code> is the name of the connector, and <code>${taskId}</code> is the ID of the task (starting from zero)</td>
+                <td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs or if conflicts are acceptable to the user.</td>

Review Comment:
   ```suggestion
                   <td>A wildcard prefix of <code>${groupId}-${connector}*</code> can be used for convenience if there is no risk of conflict with other transactional IDs, or if conflicts are acceptable to the user.</td>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042935917


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
+
+    <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>

Review Comment:
   ```suggestion
       <p>If a source connector supports exactly-once delivery, all you need to do is configure your Connect cluster to enable framework-level support for exactly-once delivery for source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1057922386


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>

Review Comment:
   Seems "delivery" is used on multiple other places further below, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1334566169

   > @joel-hamill would you be interested in taking a look at this?
   
   Hi @C0urante :) I spoke with @joel-hamill. I can take a look if you like. 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042945330


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>
+
+    <h6>Validation APIs</h6>
+
+    <p>A few additional preflight validation APIs can be implemented by source connector developers.</p>
+
+    <p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>
+
+    <p>If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector cannot provide exactly-once delivery guarantees under any conditions
+    return ExactlyOnceSupport.UNSUPPORTED;
+}
+</pre>
+
+    <p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector can always provide exactly-once delivery guarantees
+    return ExactlyOnceSupport.SUPPORTED;
+}
+</pre>
+
+    <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, via the <code>canDefineTransactionBoundaries</code> method:</p>
+
+<pre class="brush: java;">
+@Override
+public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map&lt;String, String&gt; props) {
+    // This connector can always define its own transaction boundaries
+    return ConnectorTransactionBoundaries.SUPPORTED;
+}
+</pre>
+
+    <p>This method need only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.</p>

Review Comment:
   ```suggestion
       <p>This method should only be implemented for connectors that can define their own transaction boundaries in some cases. If a connector is never able to define its own transaction boundaries, it does not need to implement this method.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042949185


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java:
##########
@@ -232,9 +232,12 @@ public String toString() {
 
     public static final String EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG = "exactly.once.source.support";
     public static final String EXACTLY_ONCE_SOURCE_SUPPORT_DOC = "Whether to enable exactly-once support for source connectors in the cluster "
-            + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones. ";
-            // TODO: https://issues.apache.org/jira/browse/KAFKA-13709
-            //       + "See the exactly-once source support documentation at [add docs link here] for more information on this feature.";
+            + "by using transactions to write source records and their source offsets, and by proactively fencing out old task generations before bringing up new ones.\n"
+            + "To enable exactly-once source support on a new cluster, set this property to '" + ExactlyOnceSourceSupport.ENABLED + "'. "
+            + "To enable support on an existing cluster, first set to '" + ExactlyOnceSourceSupport.PREPARING + "' on every worker in the cluster, "
+            + "then set to '" + ExactlyOnceSourceSupport.ENABLED + "'. A rolling upgrade may be used for both changes. "
+            + "See the <a href=\"https://kafka.apache.org/documentation.html#connect_exactlyoncesource\">exactly-once source support documentation</a> "
+            + "for more information on this feature.";

Review Comment:
   Recommend reordering this sentence to: "For more information on this feature, see the <a href..."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1334192798

   @joel-hamill would you be interested in taking a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1044663744


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>

Review Comment:
   This one I don't quite grasp--the change from "If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery" to "If a sink connector supports exactly-once delivery and to enable exactly-once delivery" doesn't seem quite right. The rest of the suggestion makes sense.
   
   @tikimims can you help me understand this part better? Was it an accident?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042942666


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>

Review Comment:
   ```suggestion
       <p>If a sink connector supports exactly-once delivery and to enable exactly-once delivery at the Connect worker level, you must ensure its consumer group is configured to ignore records in aborted transactions. You can do this by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12941:
URL: https://github.com/apache/kafka/pull/12941


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1342083368

   Hi @C0urante I took a first pass. :) Let me know if you'd like me to take another pass after any updates. 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1068558694


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>

Review Comment:
   Thanks Matthias, I didn't realize this term could be so controversial. I've filed https://github.com/apache/kafka/pull/13106 as a follow-up to fix the wording used here and elsewhere in Connect.



##########
docs/connect.html:
##########
@@ -593,6 +701,109 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+    return records;
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>

Review Comment:
   When a connector is configured to define its own transaction boundaries, we never force a transaction commit unless the connector itself requests one. So yes, in this case, we would abort the transaction during shutdown and then start reading from the last successfully-committed offset (which should have been published in the last successfully-committed transaction) once a new instance is brought up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042946060


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>
+
+    <h6>Validation APIs</h6>
+
+    <p>A few additional preflight validation APIs can be implemented by source connector developers.</p>
+
+    <p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>
+
+    <p>If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:</p>

Review Comment:
   ```suggestion
       <p>If a connector doesn't support exactly-once delivery, it should still implement this method to let users know for certain that it cannot provide exactly-once delivery guarantees:</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1044667119


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>

Review Comment:
   (I've pushed the suggested changes here except for the bit described above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042935917


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
+
+    <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>

Review Comment:
   ```suggestion
       <p>If a source connector supports exactly-once delivery, you must configure your Connect cluster to enable framework-level support for exactly-once delivery for source connectors. Additional ACLs may be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1057922653


##########
docs/connect.html:
##########
@@ -593,6 +701,109 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+    return records;
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>

Review Comment:
   Is is really "exactly"? I rebalance would force Connect to commit, and it would "split" a 10-record-tx, into two pieces? (Or would a TX be aborted in case of a rebalance and retried after the task was reassigned?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1057922653


##########
docs/connect.html:
##########
@@ -593,6 +701,109 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order for a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+    return records;
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>

Review Comment:
   Is is really "exactly"? I rebalance would force Connect to commit, and it would "split" a 10-record tx, into two pieces?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1044663744


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>

Review Comment:
   This one I don't quite grasp--the change from
   > If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery
   
   to
   
   > If a sink connector supports exactly-once delivery and to enable exactly-once delivery
   
   doesn't seem quite right. The rest of the suggestion makes sense.
   
   @tikimims can you help me understand this part better? Was it an accident?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mimaison commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
mimaison commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1048760869


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>

Review Comment:
   In order _**for**_ a source connector ...



##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }

Review Comment:
   Should we have the `return` statement in this 2 examples?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042943030


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
+
+    <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
+
+    <h6>Worker configuration</h6>
+
+    <p>For new Connect clusters, set the <code>exactly.once.source.support</code> property to <code>enabled</code> in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the <code>exactly.once.source.support</code> property should be set to <code>preparing</code>, and during the second, it should be set to <code>enabled</code>.</p>
+
+    <h6>ACL requirements</h6>
+
+    <p>With exactly-once source support enabled, the principal for each Connect worker will require these ACLs:</p>
+
+    <table class="data-table">
+        <thead>
+            <tr>
+                <th>Operation</th>
+                <th>Resource Type</th>
+                <th>Resource Name</th>
+                <th>Note</th>
+            </tr>
+        </thead>
+        <tbody>
+            <tr>
+                <td>Write</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>Describe</td>
+                <td>TransactionalId</td>
+                <td><code>connect-cluster-${groupId}</code>, where <code>${groupId}</code> is the <code>group.id</code> of the cluster</td>
+                <td></td>
+            </tr>
+            <tr>
+                <td>IdempotentWrite</td>
+                <td>Cluster</td>
+                <td>ID of the Kafka cluster that hosts the worker's config topic</td>
+                <td>The IdempotentWrite ACL has been deprecated as of 2.8 and will only be necessary for Connect clusters running on pre-2.8 Kafka clusters</td>
+            </tr>
+        </tbody>
+    </table>
+
+    <p>And the principal for each individual connector will require these ACLs:</p>

Review Comment:
   ```suggestion
       <p>And the principal for each individual connector will require the following ACLs:</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042936427


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
+
+    <h5><a id="connect_exactlyoncesink" href="#connect_exactlyoncesink">Sink connectors</a></h5>
+
+    <p>If a sink connector supports exactly-once delivery, all that is necessary to enable exactly-once delivery at the Connect worker level is ensuring that its consumer group is configured to ignore records in aborted transactions. This can be done by setting the worker property <code>consumer.isolation.level</code> to <code>read_committed</code> or, if running a version of Kafka Connect that supports it, using a <a href="#connectconfigs_connector.client.config.override.policy">connector client config override policy</a> that allows the <code>consumer.override.isolation.level</code> property to be set to <code>read_committed</code> in individual connector configs. There are no additional ACL requirements.</p>
+
+    <h5><a id="connect_exactlyoncesource" href="connect_exactlyoncesource">Source connectors</a></h5>
+
+    <p>If a source connector supports exactly-once delivery, your Connect cluster must also be configured to enable framework-level support for exactly-once delivery for source connectors, and additional ACLs will be necessary if running against a secured Kafka cluster. Note that exactly-once support for source connectors is currently only available in distributed mode; standalone Connect workers cannot provide exactly-once guarantees.</p>
+
+    <h6>Worker configuration</h6>
+
+    <p>For new Connect clusters, set the <code>exactly.once.source.support</code> property to <code>enabled</code> in the worker config for each node in the cluster. For existing clusters, two rolling upgrades are necessary. During the first upgrade, the <code>exactly.once.source.support</code> property should be set to <code>preparing</code>, and during the second, it should be set to <code>enabled</code>.</p>
+
+    <h6>ACL requirements</h6>
+
+    <p>With exactly-once source support enabled, the principal for each Connect worker will require these ACLs:</p>

Review Comment:
   ```suggestion
       <p>With exactly-once source support enabled, the principal for each Connect worker will require the following ACLs:</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042944441


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>
+
+    <h6>Validation APIs</h6>
+
+    <p>A few additional preflight validation APIs can be implemented by source connector developers.</p>
+
+    <p>Some users may require exactly-once delivery guarantees from a connector. In this case, they may set the <code>exactly.once.support</code> property to <code>required</code> in the configuration for the connector. When this happens, the Kafka Connect framework will ask the connector whether it can provide exactly-once delivery guarantees with the specified configuration. This is done by invoking the <code>exactlyOnceSupport</code> method on the connector.</p>
+
+    <p>If a connector doesn't support exactly-once, it should still implement this method, to let users know for certain that it cannot provide exactly-once delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector cannot provide exactly-once delivery guarantees under any conditions
+    return ExactlyOnceSupport.UNSUPPORTED;
+}
+</pre>
+
+    <p>Otherwise, a connector should examine the configuration, and return <code>ExactlyOnceSupport.SUPPORTED</code> if it can provide exactly-once delivery guarantees:</p>
+
+<pre class="brush: java;">
+@Override
+public ExactlyOnceSupport exactlyOnceSupport(Map&lt;String, String&gt; props) {
+    // This connector can always provide exactly-once delivery guarantees
+    return ExactlyOnceSupport.SUPPORTED;
+}
+</pre>
+
+    <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, via the <code>canDefineTransactionBoundaries</code> method:</p>

Review Comment:
   ```suggestion
       <p>Additionally, if the user has configured the connector to define its own transaction boundaries, the Kafka Connect framework will ask the connector whether it can define its own transaction boundaries with the specified configuration, using the <code>canDefineTransactionBoundaries</code> method:</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042946621


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }
+}
+</pre>
+
+    <p>Or to commit a transaction for exactly every tenth record:</p>
+
+    <pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    for (SourceRecord record : records) {
+        if (++this.recordsSent % 10 == 0) {
+            this.context.transactionContext().commitTransaction(record);
+        }
+    }
+}
+</pre>
+
+    <p>Most connectors do not need to define their own transaction boundaries. However, it may be useful if files or objects in the source system are broken up into multiple source records, but should be delivered atomically. Additionally, it may be useful if it is impossible to give each source record a unique source offset, if every record with a given offset is delivered within a single transaction.</p>
+
+    <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector config, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>

Review Comment:
   ```suggestion
       <p>Note that if the user has not enabled connector-defined transaction boundaries in the connector configuration, the <code>TransactionContext</code> returned by <code>context.transactionContext()</code> will be <code>null</code>.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1344572069

   Thanks @tikimims! I've accepted all but one of the suggestions and asked for clarification on the other. I didn't realize how common usage of "you" and "your" in the docs were; switching to that wording really helps tidy things up!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1048857762


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>

Review Comment:
   Good catch, thanks!



##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }

Review Comment:
   Good call, done 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1334566854

   That'd be great! Thanks @tikimims 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1348658405

   Hi @mimaison @tombentley would either of you have time to take a look at this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12941:
URL: https://github.com/apache/kafka/pull/12941#issuecomment-1353348389

   Thanks Mickael!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1057921805


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>

Review Comment:
   > exactly-once delivery guarantees
   
   We should not use this phrase... it's loaded...
   
   "delivery" is a coined term from a research paper that proves that it's not possible to send a message exactly once (ie, network layer) -- EOS in Kafka is not about "not retrying (or only retrying if we magically know that the message was not delivered" but to "mask" retries and provide the same "semantic result in case of failure and retries".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tikimims commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

Posted by GitBox <gi...@apache.org>.
tikimims commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1042930128


##########
docs/connect.html:
##########
@@ -369,6 +369,114 @@ <h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reportin
 # Tolerate all errors.
 errors.tolerance=all</pre>
 
+    <h4><a id="connect_exactlyonce" href="#connect_exactlyonce">Exactly-once support</a></h4>
+
+    <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector being run.</b> Even if all the correct worker properties are set in the config for each node in a cluster, if a connector is not designed to or simply cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>

Review Comment:
   ```suggestion
       <p>Kafka Connect is capable of providing exactly-once delivery guarantees for sink connectors (as of version 0.11.0) and source connectors (as of version 3.3.0). Please note that <b>support for exactly-once delivery is highly dependent on the type of connector you run.</b> Even if you set all the correct worker properties in the configuration for each node in a cluster, if a connector is not designed to, or cannot take advantage of the capabilities of the Kafka Connect framework, exactly-once may not be possible.</p>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org