You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/05/06 09:14:21 UTC
[kafka] branch trunk updated: KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a586c94af1 KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)
a586c94af1 is described below
commit a586c94af19ff6b825e974e26a241115defa6cda
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Fri May 6 05:13:59 2022 -0400
KAFKA-10000: Add new source connector APIs related to exactly-once support (KIP-618) (#11773)
Reviewers: Mickael Maison <mi...@gmail.com>, Reviewers: Tom Bentley <tb...@redhat.com>, Hector Geraldino <hg...@bloomberg.net>, Andrew Eugene Choi <an...@uwaterloo.ca>
---
...or.java => ConnectorTransactionBoundaries.java} | 20 +++----
...ourceConnector.java => ExactlyOnceSupport.java} | 20 +++----
.../kafka/connect/source/SourceConnector.java | 44 ++++++++++++++++
.../apache/kafka/connect/source/SourceTask.java | 61 +++++++++++++++++-----
.../kafka/connect/source/SourceTaskContext.java | 25 +++++++++
.../kafka/connect/source/TransactionContext.java | 56 ++++++++++++++++++++
6 files changed, 193 insertions(+), 33 deletions(-)
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
similarity index 70%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
index 6e9694024d..73746ba099 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/ConnectorTransactionBoundaries.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.connect.source;
-import org.apache.kafka.connect.connector.Connector;
-
/**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * An enum to represent the level of support for connector-defined transaction boundaries.
*/
-public abstract class SourceConnector extends Connector {
-
- @Override
- protected SourceConnectorContext context() {
- return (SourceConnectorContext) context;
- }
+public enum ConnectorTransactionBoundaries {
+ /**
+ * Signals that a connector can define its own transaction boundaries.
+ */
+ SUPPORTED,
+ /**
+ * Signals that a connector cannot define its own transaction boundaries.
+ */
+ UNSUPPORTED
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
similarity index 71%
copy from connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
copy to connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
index 6e9694024d..3980410e4b 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/ExactlyOnceSupport.java
@@ -16,16 +16,16 @@
*/
package org.apache.kafka.connect.source;
-import org.apache.kafka.connect.connector.Connector;
-
/**
- * SourceConnectors implement the connector interface to pull data from another system and send
- * it to Kafka.
+ * An enum to represent the level of support for exactly-once delivery from a source connector.
*/
-public abstract class SourceConnector extends Connector {
-
- @Override
- protected SourceConnectorContext context() {
- return (SourceConnectorContext) context;
- }
+public enum ExactlyOnceSupport {
+ /**
+ * Signals that a connector supports exactly-once delivery.
+ */
+ SUPPORTED,
+ /**
+ * Signals that a connector does not support exactly-once delivery.
+ */
+ UNSUPPORTED;
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
index 6e9694024d..7fc2a5d11c 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.source;
import org.apache.kafka.connect.connector.Connector;
+import java.util.Map;
+
/**
* SourceConnectors implement the connector interface to pull data from another system and send
* it to Kafka.
@@ -28,4 +30,46 @@ public abstract class SourceConnector extends Connector {
protected SourceConnectorContext context() {
return (SourceConnectorContext) context;
}
+
+ /**
+ * Signals whether the connector supports exactly-once delivery guarantees with a proposed configuration.
+ * Connector authors can assume that worker-level exactly-once support is enabled when this method is invoked.
+ *
+ * <p>For backwards compatibility, the default implementation will return {@code null}, but connector authors are
+ * strongly encouraged to override this method to return a non-null value such as
+ * {@link ExactlyOnceSupport#SUPPORTED SUPPORTED} or {@link ExactlyOnceSupport#UNSUPPORTED UNSUPPORTED}.
+ *
+ * <p>Similar to {@link #validate(Map) validate}, this method may be called by the runtime before the
+ * {@link #start(Map) start} method is invoked when the connector will be run with exactly-once support.
+ *
+ * @param connectorConfig the configuration that will be used for the connector.
+ * @return {@link ExactlyOnceSupport#SUPPORTED} if the connector can provide exactly-once support with the given
+ * configuration, and {@link ExactlyOnceSupport#UNSUPPORTED} if it cannot. If this method is overridden by a
+ * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot provide
+ * exactly-once guarantees.
+ * @since 3.3
+ */
+ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+ return null;
+ }
+
+ /**
+ * Signals whether the connector implementation is capable of defining the transaction boundaries for a
+ * connector with the given configuration. This method is called before {@link #start(Map)}, only when the
+ * runtime supports exactly-once and the connector configuration includes {@code transaction.boundary=connector}.
+ *
+ * <p>This method need not be implemented if the connector implementation does not support defining
+ * transaction boundaries.
+ *
+ * @param connectorConfig the configuration that will be used for the connector
+ * @return {@link ConnectorTransactionBoundaries#SUPPORTED} if the connector will define its own transaction boundaries,
+ * or {@link ConnectorTransactionBoundaries#UNSUPPORTED} otherwise. If this method is overridden by a
+ * connector, should not be {@code null}, but if {@code null}, it will be assumed that the connector cannot define its own
+ * transaction boundaries.
+ * @since 3.3
+ * @see TransactionContext
+ */
+ public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
+ return ConnectorTransactionBoundaries.UNSUPPORTED;
+ }
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index f5209e1cca..2159e68e8a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -16,10 +16,11 @@
*/
package org.apache.kafka.connect.source;
-import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.connect.connector.Task;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
/**
@@ -27,6 +28,51 @@ import java.util.Map;
*/
public abstract class SourceTask implements Task {
+ /**
+ * The configuration key that determines how source tasks will define transaction boundaries
+ * when exactly-once support is enabled.
+ */
+ public static final String TRANSACTION_BOUNDARY_CONFIG = "transaction.boundary";
+
+ /**
+ * Represents the permitted values for the {@link #TRANSACTION_BOUNDARY_CONFIG} property.
+ */
+ public enum TransactionBoundary {
+ /**
+ * A new transaction will be started and committed for every batch of records returned by {@link #poll()}.
+ */
+ POLL,
+ /**
+ * Transactions will be started and committed on a user-defined time interval.
+ */
+ INTERVAL,
+ /**
+ * Transactions will be defined by the connector itself, via a {@link TransactionContext}.
+ */
+ CONNECTOR;
+
+ /**
+ * The default transaction boundary style that will be used for source connectors when no style is explicitly
+ * configured.
+ */
+ public static final TransactionBoundary DEFAULT = POLL;
+
+ /**
+ * Parse a {@link TransactionBoundary} from the given string.
+ * @param property the string to parse; should not be null
+ * @return the {@link TransactionBoundary} whose name matches the given string
+ * @throws IllegalArgumentException if there is no transaction boundary type with the given name
+ */
+ public static TransactionBoundary fromProperty(String property) {
+ return TransactionBoundary.valueOf(property.toUpperCase(Locale.ROOT).trim());
+ }
+
+ @Override
+ public String toString() {
+ return name().toLowerCase(Locale.ROOT);
+ }
+ }
+
protected SourceTaskContext context;
/**
@@ -44,16 +90,13 @@ public abstract class SourceTask implements Task {
public abstract void start(Map<String, String> props);
/**
- * <p>
* Poll this source task for new records. If no data is currently available, this method
* should block but return control to the caller regularly (by returning {@code null}) in
* order for the task to transition to the {@code PAUSED} state if requested to do so.
- * </p>
* <p>
* The task will be {@link #stop() stopped} on a separate thread, and when that happens
* this method is expected to unblock, quickly finish up any remaining processing, and
* return.
- * </p>
*
* @return a list of source records
*/
@@ -63,12 +106,10 @@ public abstract class SourceTask implements Task {
* <p>
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
* method should block until the commit is complete.
- * </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
- * </p>
*/
public void commit() throws InterruptedException {
// This space intentionally left blank.
@@ -91,17 +132,14 @@ public abstract class SourceTask implements Task {
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
- * </p>
* <p>
* This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
* implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
* to override both methods.
- * </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
- * </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @throws InterruptedException
@@ -115,19 +153,16 @@ public abstract class SourceTask implements Task {
/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
- * also called when a record is filtered by a transformation or when {@link ConnectorConfig} "errors.tolerance" is set to "all"
+ * also called when a record is filtered by a transformation or when "errors.tolerance" is set to "all"
* and thus will never be ACK'd by a broker.
* In both cases {@code metadata} will be null.
- * </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
- * </p>
* <p>
* The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
* not necessary to implement both methods.
- * </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
index ddb0a78718..7745b197c2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTaskContext.java
@@ -38,4 +38,29 @@ public interface SourceTaskContext {
* Get the OffsetStorageReader for this SourceTask.
*/
OffsetStorageReader offsetStorageReader();
+
+ /**
+ * Get a {@link TransactionContext} that can be used to define producer transaction boundaries
+ * when exactly-once support is enabled for the connector.
+ *
+ * <p>This method was added in Apache Kafka 3.2. Source tasks that use this method but want to
+ * maintain backward compatibility so they can also be deployed to older Connect runtimes
+ * should guard the call to this method with a try-catch block, since calling this method will result in a
+ * {@link NoSuchMethodException} or {@link NoClassDefFoundError} when the source connector is deployed to
+ * Connect runtimes older than Kafka 3.2. For example:
+ * <pre>
+ * TransactionContext transactionContext;
+ * try {
+ * transactionContext = context.transactionContext();
+ * } catch (NoSuchMethodError | NoClassDefFoundError e) {
+ * transactionContext = null;
+ * }
+ * </pre>
+ *
+ * @return the transaction context, or null if the connector was not configured to specify transaction boundaries
+ * @since 3.3
+ */
+ default TransactionContext transactionContext() {
+ return null;
+ }
}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
new file mode 100644
index 0000000000..f90d75baf4
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/TransactionContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+/**
+ * Provided to source tasks to allow them to define their own producer transaction boundaries when
+ * exactly-once support is enabled.
+ */
+public interface TransactionContext {
+
+ /**
+ * Request a transaction commit after the next batch of records from {@link SourceTask#poll()}
+ * is processed.
+ */
+ void commitTransaction();
+
+ /**
+ * Request a transaction commit after a source record is processed. The source record will be the
+ * last record in the committed transaction.
+ * @param record the record to commit the transaction after; may not be null.
+ */
+ void commitTransaction(SourceRecord record);
+
+ /**
+ * Requests a transaction abort after the next batch of records from {@link SourceTask#poll()}. All of
+ * the records in that transaction will be discarded and will not appear in a committed transaction.
+ * However, offsets for that transaction will still be committed so than the records in that transaction
+ * are not reprocessed. If the data should instead be reprocessed, the task should not invoke this method
+ * and should instead throw an exception.
+ */
+ void abortTransaction();
+
+ /**
+ * Requests a transaction abort after a source record is processed. The source record will be the
+ * last record in the aborted transaction. All of the records in that transaction will be discarded
+ * and will not appear in a committed transaction. However, offsets for that transaction will still
+ * be committed so that the records in that transaction are not reprocessed. If the data should be
+ * reprocessed, the task should not invoke this method and should instead throw an exception.
+ * @param record the record to abort the transaction after; may not be null.
+ */
+ void abortTransaction(SourceRecord record);
+}