You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/14 17:32:18 UTC

[GitHub] [kafka] mimaison commented on a diff in pull request #12941: KAFKA-13709: Add docs for exactly-once support in Connect

mimaison commented on code in PR #12941:
URL: https://github.com/apache/kafka/pull/12941#discussion_r1048760869


##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>

Review Comment:
   In order _**for**_ a source connector ...



##########
docs/connect.html:
##########
@@ -593,6 +701,107 @@ <h5><a id="connect_resuming" href="#connect_resuming">Resuming from Previous Off
 
     <p>Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.</p>
 
+    <h5><a id="connect_exactlyoncesourceconnectors" href="#connect_exactlyoncesourceconnectors>">Exactly-once source connectors</a></h5>
+
+    <h6>Supporting exactly-once</h6>
+
+    <p>With the passing of <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors">KIP-618</a>, Kafka Connect supports exactly-once source connectors as of version 3.3.0. In order a source connector to take advantage of this support, it must be able to provide meaningful source offsets for each record that it emits, and resume consumption from the external system at the exact position corresponding to any of those offsets without dropping or duplicating messages.</p>
+
+    <h6>Defining transaction boundaries</h6>
+
+    <p>By default, the Kafka Connect framework will create and commit a new Kafka transaction for each batch of records that a source task returns from its <code>poll</code> method. However, connectors can also define their own transaction boundaries, which can be enabled by users by setting the <code>transaction.boundary</code> property to <code>connector</code> in the config for the connector.</p>
+
+    <p>If enabled, the connector's tasks will have access to a <code>TransactionContext</code> from their <code>SourceTaskContext</code>, which they can use to control when transactions are aborted and committed.</p>
+
+    <p>For example, to commit a transaction at least every ten records:</p>
+
+<pre class="brush: java;">
+private int recordsSent;
+
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    this.recordsSent = 0;
+}
+
+@Override
+public List&lt;SourceRecord&gt; poll() {
+    List&lt;SourceRecord&gt; records = fetchRecords();
+    boolean shouldCommit = false;
+    for (SourceRecord record : records) {
+        if (++this.recordsSent >= 10) {
+            shouldCommit = true;
+        }
+    }
+    if (shouldCommit) {
+        this.recordsSent = 0;
+        this.context.transactionContext().commitTransaction();
+    }

Review Comment:
   Should we have the `return` statement in this 2 examples?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org