You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/05/06 09:14:21 UTC

[kafka] branch trunk updated: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a586c94af1 KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)
a586c94af1 is described below

commit a586c94af19ff6b825e974e26a241115defa6cda
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Fri May 6 05:13:59 2022 -0400

    KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Reviewers: Tom Bentley <tb...@redhat.com>, Hector Geraldino <hg...@bloomberg.net>, Andrew Eugene Choi <an...@uwaterloo.ca>
---
 ...or.java => ConnectorTransactionBoundaries.java} | 20 +++----
 ...ourceConnector.java => ExactlyOnceSupport.java} | 20 +++----
 .../kafka/connect/source/SourceConnector.java      | 44 ++++++++++++++++
 .../apache/kafka/connect/source/SourceTask.java    | 61 +++++++++++++++++-----
 .../kafka/connect/source/SourceTaskContext.java    | 25 +++++++++
 .../kafka/connect/source/TransactionContext.java   | 56 ++++++++++++++++++++
 6 files changed, 193 insertions(+), 33 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
similarity index 70%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
index 6e9694024d..73746ba099 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Connector;
-
 /**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * An enum to represent the level of support for connector-defined transaction boundaries.
  */
-public abstract class SourceConnector extends Connector {
-
-    @Override
-    protected SourceConnectorContext context() {
-        return (SourceConnectorContext) context;
-    }
+public enum ConnectorTransactionBoundaries {
+    /**
+     * Signals that a connector can define its own transaction boundaries.
+     */
+    SUPPORTED,
+    /**
+     * Signals that a connector cannot define its own transaction boundaries.
+     */
+    UNSUPPORTED
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
similarity index 71%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
index 6e9694024d..3980410e4b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
@@ -16,16 +16,16 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Connector;
-
 /**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * An enum to represent the level of support for exactly-once delivery from a source connector.
  */
-public abstract class SourceConnector extends Connector {
-
-    @Override
-    protected SourceConnectorContext context() {
-        return (SourceConnectorContext) context;
-    }
+public enum ExactlyOnceSupport {
+    /**
+     * Signals that a connector supports exactly-once delivery.
+     */
+    SUPPORTED,
+    /**
+     * Signals that a connector does not support exactly-once delivery.
+     */
+    UNSUPPORTED;
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 6e9694024d..7fc2a5d11c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.source;
 
 import org.apache.kafka.connect.connector.Connector;
 
+import java.util.Map;
+
 /**
  * SourceConnectors implement the connector interface to pull data from another system and send
  * it to Kafka.
@@ -28,4 +30,46 @@ public abstract class SourceConnector extends Connector {
     protected SourceConnectorContext context() {
         return (SourceConnectorContext) context;
     }
+
+    /**
+     * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+     * Connector authors can assume that worker-level exactly-once support is enabled when this method is invoked.
+     *
+     * <p>For backwards compatibility, the default implementation will return {@code null}, but connector authors are
+     * strongly encouraged to override this method to return a non-null value such as
+     * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+     *
+     * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+     * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+     *
+     * @param connectorConfig the configuration that will be used for the connector.
+     * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+     * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+     * exactly-once guarantees.
+     * @since 3.3
+     */
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+        return null;
+    }
+
+    /**
+     * Signals whether the connector implementation is capable of defining the transaction boundaries for a
+     * connector with the given configuration. This method is called before {@link #start(Map)}, only when the
+     * runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}.
+     *
+     * <p>This method need not be implemented if the connector implementation does not support defining
+     * transaction boundaries.
+     *
+     * @param connectorConfig the configuration that will be used for the connector
+     * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries,
+     * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If this method is overridden by a
+     * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot define its own
+     * transaction boundaries.
+     * @since 3.3
+     * @see TransactionContext
+     */
+    public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
+        return ConnectorTransactionBoundaries.UNSUPPORTED;
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index f5209e1cca..2159e68e8a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -16,10 +16,11 @@
  */
 package org.apache.kafka.connect.source;
 
-import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
 
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
@@ -27,6 +28,51 @@ import java.util.Map;
  */
 public abstract class SourceTask implements Task {
 
+    /**
+     * The configuration key that determines how source tasks will define transaction boundaries
+     * when exactly-once support is enabled.
+     */
+    public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";
+
+    /**
+     * Represents the permitted values for the {@link #TRANSACTION_BOUNDARY_CONFIG} property.
+     */
+    public enum TransactionBoundary {
+        /**
+         * A new transaction will be started and committed for every batch of records returned by {@link #poll()}.
+         */
+        POLL,
+        /**
+         * Transactions will be started and committed on a user-defined time interval.
+         */
+        INTERVAL,
+        /**
+         * Transactions will be defined by the connector itself, via a {@link TransactionContext}.
+         */
+        CONNECTOR;
+
+        /**
+         * The default transaction boundary style that will be used for source connectors when no style is explicitly
+         * configured.
+         */
+        public static final TransactionBoundary DEFAULT = POLL;
+
+        /**
+         * Parse a {@link TransactionBoundary} from the given string.
+         * @param property the string to parse; should not be null
+         * @return the {@link TransactionBoundary} whose name matches the given string
+         * @throws IllegalArgumentException if there is no transaction boundary type with the given name
+         */
+        public static TransactionBoundary fromProperty(String property) {
+            return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
+        }
+
+        @Override
+        public String toString() {
+            return name().toLowerCase(Locale.ROOT);
+        }
+    }
+
     protected SourceTaskContext context;
 
     /**
@@ -44,16 +90,13 @@ public abstract class SourceTask implements Task {
     public abstract void start(Map<String, String> props);
 
     /**
-     * <p>
      * Poll this source task for new records. If no data is currently available, this method
      * should block but return control to the caller regularly (by returning {@code null}) in
      * order for the task to transition to the {@code PAUSED} state if requested to do so.
-     * </p>
      * <p>
      * The task will be {@link #stop() stopped} on a separate thread, and when that happens
      * this method is expected to unblock, quickly finish up any remaining processing, and
      * return.
-     * </p>
      *
      * @return a list of source records
      */
@@ -63,12 +106,10 @@ public abstract class SourceTask implements Task {
      * <p>
      * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
      * method should block until the commit is complete.
-     * </p>
      * <p>
      * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
      * automatically. This hook is provided for systems that also need to store offsets internally
      * in their own system.
-     * </p>
      */
     public void commit() throws InterruptedException {
         // This space intentionally left blank.
@@ -91,17 +132,14 @@ public abstract class SourceTask implements Task {
      * <p>
      * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
      * also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
-     * </p>
      * <p>
      * This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
      * implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
      * to override both methods.
-     * </p>
      * <p>
      * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
      * automatically. This hook is provided for systems that also need to store offsets internally
      * in their own system.
-     * </p>
      *
      * @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
      * @throws InterruptedException
@@ -115,19 +153,16 @@ public abstract class SourceTask implements Task {
     /**
      * <p>
      * Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
-     * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
+     * also called when a record is filtered by a transformation or when "errors.tolerance" is set to "all"
      * and thus will never be ACK'd by a broker.
      * In both cases {@code metadata} will be null.
-     * </p>
      * <p>
      * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
      * automatically. This hook is provided for systems that also need to store offsets internally
      * in their own system.
-     * </p>
      * <p>
      * The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
      * not necessary to implement both methods.
-     * </p>
      *
      * @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
      * @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
index ddb0a78718..7745b197c2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -38,4 +38,29 @@ public interface SourceTaskContext {
      * Get the OffsetStorageReader for this SourceTask.
      */
     OffsetStorageReader offsetStorageReader();
+
+    /**
+     * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
+     * when exactly-once support is enabled for the connector.
+     *
+     * <p>This method was added in Apache Kafka 3.2. Source tasks that use this method but want to
+     * maintain backward compatibility so they can also be deployed to older Connect runtimes
+     * should guard the call to this method with a try-catch block, since calling this method will result in a
+     * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
+     * Connect runtimes older than Kafka 3.2. For example:
+     * <pre>
+     *     TransactionContext transactionContext;
+     *     try {
+     *         transactionContext = context.transactionContext();
+     *     } catch (NoSuchMethodError | NoClassDefFoundError e) {
+     *         transactionContext = null;
+     *     }
+     * </pre>
+     *
+     * @return the transaction context, or null if the connector was not configured to specify transaction boundaries
+     * @since 3.3
+     */
+    default TransactionContext transactionContext() {
+        return null;
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
new file mode 100644
index 0000000000..f90d75baf4
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+    /**
+     * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+     * is processed.
+     */
+    void commitTransaction();
+
+    /**
+     * Request a transaction commit after a source record is processed. The source record will be the
+     * last record in the committed transaction.
+     * @param record the record to commit the transaction after; may not be null.
+     */
+    void commitTransaction(SourceRecord record);
+
+    /**
+     * Requests a transaction abort after the next batch of records from {@link SourceTask#poll()}. All of
+     * the records in that transaction will be discarded and will not appear in a committed transaction.
+     * However, offsets for that transaction will still be committed so than the records in that transaction
+     * are not reprocessed. If the data should instead be reprocessed, the task should not invoke this method
+     * and should instead throw an exception.
+     */
+    void abortTransaction();
+
+    /**
+     * Requests a transaction abort after a source record is processed. The source record will be the
+     * last record in the aborted transaction. All of the records in that transaction will be discarded
+     * and will not appear in a committed transaction. However, offsets for that transaction will still
+     * be committed so that the records in that transaction are not reprocessed. If the data should be
+     * reprocessed, the task should not invoke this method and should instead throw an exception.
+     * @param record the record to abort the transaction after; may not be null.
+     */
+    void abortTransaction(SourceRecord record);
+}