You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/06 02:35:27 UTC
[kafka] branch trunk updated: KAFKA-10000: Integration tests (#11782)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ae1afa4383 KAFKA-10000: Integration tests (#11782)
3ae1afa4383 is described below
commit 3ae1afa43838066e44ea78918050c6780c208042
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Jul 5 22:35:05 2022 -0400
KAFKA-10000: Integration tests (#11782)
Implements embedded end-to-end integration tests for KIP-618, and brings together previously-decoupled logic from upstream PRs.
Reviewers: Luke Chen <sh...@gmail.com>, Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>
---
.../connect/runtime/AbstractWorkerSourceTask.java | 8 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 2 -
.../runtime/rest/resources/ConnectorsResource.java | 1 +
.../kafka/connect/integration/ConnectorHandle.java | 8 +
.../ExactlyOnceSourceIntegrationTest.java | 1130 ++++++++++++++++++++
.../integration/MonitorableSourceConnector.java | 139 ++-
.../clusters/EmbeddedConnectClusterAssertions.java | 3 +
.../util/clusters/EmbeddedKafkaCluster.java | 150 ++-
8 files changed, 1418 insertions(+), 23 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 837891071a4..d89f577688f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -196,6 +196,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
List<SourceRecord> toSend;
protected Map<String, String> taskConfig;
protected boolean started = false;
+ private volatile boolean producerClosed = false;
protected AbstractWorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@@ -315,6 +316,7 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
private void closeProducer(Duration duration) {
if (producer != null) {
+ producerClosed = true;
Utils.closeQuietly(() -> producer.close(duration), "source task producer");
}
}
@@ -397,7 +399,11 @@ public abstract class AbstractWorkerSourceTask extends WorkerTask {
producerRecord,
(recordMetadata, e) -> {
if (e != null) {
- log.debug("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
+ if (producerClosed) {
+ log.trace("{} failed to send record to {}; this is expected as the producer has already been closed", AbstractWorkerSourceTask.this, topic, e);
+ } else {
+ log.error("{} failed to send record to {}: ", AbstractWorkerSourceTask.this, topic, e);
+ }
log.trace("{} Failed record: {}", AbstractWorkerSourceTask.this, preTransformRecord);
producerSendFailed(false, producerRecord, preTransformRecord, e);
} else {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 1a63cc1532d..37d93a3fe86 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -174,8 +174,6 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
);
commitTaskRecord(preTransformRecord, null);
} else {
- log.error("{} failed to send record to {}: ", WorkerSourceTask.this, topic, e);
- log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
}
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index 1d14f506c85..92a7d543fff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -332,6 +332,7 @@ public class ConnectorsResource implements ConnectResource {
@PUT
@Path("/{connector}/fence")
+ @Operation(hidden = true, summary = "This operation is only for inter-worker communications")
public void fenceZombies(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
index b31455b2484..bed05fa21e4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -112,6 +112,14 @@ public class ConnectorHandle {
taskHandles.remove(taskId);
}
+ /**
+ * Delete all task handles for this connector.
+ */
+ public void clearTasks() {
+ log.info("Clearing {} existing task handles for connector {}", taskHandles.size(), connectorName);
+ taskHandles.clear();
+ }
+
/**
* Set the number of expected records for this connector.
*
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
new file mode 100644
index 00000000000..25a419ed8f8
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -0,0 +1,1130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.resource.PatternType;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG;
+import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.OFFSETS_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_CONFIG;
+import static org.apache.kafka.connect.runtime.SourceConnectorConfig.TRANSACTION_BOUNDARY_INTERVAL_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
+import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.CONNECTOR;
+import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.INTERVAL;
+import static org.apache.kafka.connect.source.SourceTask.TransactionBoundary.POLL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+@Category(IntegrationTest.class)
+public class ExactlyOnceSourceIntegrationTest {
+
+ private static final Logger log = LoggerFactory.getLogger(ExactlyOnceSourceIntegrationTest.class);
+ private static final String CLUSTER_GROUP_ID = "exactly-once-source-integration-test";
+ private static final String CONNECTOR_NAME = "exactlyOnceQuestionMark";
+
+ private static final int CONSUME_RECORDS_TIMEOUT_MS = 60_000;
+ private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
+ private static final int DEFAULT_NUM_WORKERS = 3;
+
+ private Properties brokerProps;
+ private Map<String, String> workerProps;
+ private EmbeddedConnectCluster.Builder connectBuilder;
+ private EmbeddedConnectCluster connect;
+ private ConnectorHandle connectorHandle;
+
+ @Before
+ public void setup() {
+ workerProps = new HashMap<>();
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
+ workerProps.put(DistributedConfig.GROUP_ID_CONFIG, CLUSTER_GROUP_ID);
+
+ brokerProps = new Properties();
+ brokerProps.put("transaction.state.log.replication.factor", "1");
+ brokerProps.put("transaction.state.log.min.isr", "1");
+
+ // build a Connect cluster backed by Kafka and Zk
+ connectBuilder = new EmbeddedConnectCluster.Builder()
+ .numWorkers(DEFAULT_NUM_WORKERS)
+ .numBrokers(1)
+ .workerProps(workerProps)
+ .brokerProps(brokerProps);
+
+ // get a handle to the connector
+ connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+ }
+
+ private void startConnect() {
+ connect = connectBuilder.build();
+ connect.start();
+ }
+
+ @After
+ public void close() {
+ try {
+ // stop all Connect, Kafka and Zk threads.
+ connect.stop();
+ } finally {
+ // Clear the handle for the connector. Fun fact: if you don't do this, your tests become quite flaky.
+ RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+ }
+ }
+
+ /**
+ * A simple test for the pre-flight validation API for connectors to provide their own delivery guarantees.
+ */
+ @Test
+ public void testPreflightValidation() {
+ connectBuilder.numWorkers(1);
+ startConnect();
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, "1");
+ props.put(TOPIC_CONFIG, "topic");
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+
+ // Test out the "exactly.once.support" property
+ props.put(EXACTLY_ONCE_SUPPORT_CONFIG, "required");
+
+ // Connector will return null from SourceConnector::exactlyOnceSupport
+ props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_NULL);
+ ConfigInfos validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ ConfigInfo propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
+ assertFalse("Preflight validation for exactly-once support property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will return UNSUPPORTED from SourceConnector::exactlyOnceSupport
+ props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_UNSUPPORTED);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
+ assertFalse("Preflight validation for exactly-once support property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will throw an exception from SourceConnector::exactlyOnceSupport
+ props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_FAIL);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ propertyValidation = findConfigInfo(EXACTLY_ONCE_SUPPORT_CONFIG, validation);
+ assertFalse("Preflight validation for exactly-once support property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will return SUPPORTED from SourceConnector::exactlyOnceSupport
+ props.put(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, MonitorableSourceConnector.EXACTLY_ONCE_SUPPORTED);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have zero errors", 0, validation.errorCount());
+
+ // Test out the transaction boundary definition property
+ props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString());
+
+ // Connector will return null from SourceConnector::canDefineTransactionBoundaries
+ props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_NULL);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
+ assertFalse("Preflight validation for transaction boundary property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will return UNSUPPORTED from SourceConnector::canDefineTransactionBoundaries
+ props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_UNSUPPORTED);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
+ assertFalse("Preflight validation for transaction boundary property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will throw an exception from SourceConnector::canDefineTransactionBoundaries
+ props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_FAIL);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have exactly one error", 1, validation.errorCount());
+ propertyValidation = findConfigInfo(TRANSACTION_BOUNDARY_CONFIG, validation);
+ assertFalse("Preflight validation for transaction boundary property should have at least one error message",
+ propertyValidation.configValue().errors().isEmpty());
+
+ // Connector will return SUPPORTED from SourceConnector::canDefineTransactionBoundaries
+ props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED);
+ validation = connect.validateConnectorConfig(MonitorableSourceConnector.class.getSimpleName(), props);
+ assertEquals("Preflight validation should have zero errors", 0, validation.errorCount());
+ }
+
+ /**
+ * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled
+ * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED"
+ * isolation level. The "poll" transaction boundary is used.
+ */
+ @Test
+ public void testPollBoundary() throws Exception {
+ // Much slower offset commit interval; should never be triggered during this test
+ workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000");
+ connectBuilder.numWorkers(1);
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int numTasks = 1;
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true);
+ connect.deleteConnector(CONNECTOR_NAME);
+ assertConnectorStopped(connectorStop);
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> records = connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
+ records.count() >= recordsProduced);
+ assertExactlyOnceSeqnos(records, numTasks);
+ }
+
+ /**
+ * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled
+ * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED"
+ * isolation level. The "interval" transaction boundary is used with a connector-specific override.
+ */
+ @Test
+ public void testIntervalBoundary() throws Exception {
+ // Much slower offset commit interval; should never be triggered during this test
+ workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000");
+ connectBuilder.numWorkers(1);
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int numTasks = 1;
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
+ props.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, "10000");
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true);
+ connect.deleteConnector(CONNECTOR_NAME);
+ assertConnectorStopped(connectorStop);
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> records = connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
+ records.count() >= recordsProduced);
+ assertExactlyOnceSeqnos(records, numTasks);
+ }
+
+ /**
+ * A simple green-path test that ensures the worker can start up a source task with exactly-once support enabled
+ * and write some records to Kafka that will be visible to a downstream consumer using the "READ_COMMITTED"
+ * isolation level. The "connector" transaction boundary is used with a connector that defines transactions whose
+ * size correspond to successive elements of the Fibonacci sequence, where transactions with an even number of
+ * records are aborted, and those with an odd number of records are committed.
+ */
+ @Test
+ public void testConnectorBoundary() throws Exception {
+ String offsetsTopic = "exactly-once-source-cluster-offsets";
+ workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetsTopic);
+ connectBuilder.numWorkers(1);
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, "1");
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString());
+ props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED);
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka()
+ .consume(
+ recordsProduced,
+ TimeUnit.MINUTES.toMillis(1),
+ consumerProps,
+ "test-topic");
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+ sourceRecords.count() >= recordsProduced);
+
+ // also consume from the cluster's offsets topic to verify that the expected offsets (which should correspond to the connector's
+ // custom transaction boundaries) were committed
+ List<Long> expectedOffsetSeqnos = new ArrayList<>();
+ long lastExpectedOffsetSeqno = 1;
+ long nextExpectedOffsetSeqno = 1;
+ while (nextExpectedOffsetSeqno <= recordsProduced) {
+ expectedOffsetSeqnos.add(nextExpectedOffsetSeqno);
+ nextExpectedOffsetSeqno += lastExpectedOffsetSeqno;
+ lastExpectedOffsetSeqno = nextExpectedOffsetSeqno - lastExpectedOffsetSeqno;
+ }
+ ConsumerRecords<byte[], byte[]> offsetRecords = connect.kafka()
+ .consume(
+ expectedOffsetSeqnos.size(),
+ TimeUnit.MINUTES.toMillis(1),
+ consumerProps,
+ offsetsTopic
+ );
+
+ List<Long> actualOffsetSeqnos = new ArrayList<>();
+ offsetRecords.forEach(record -> actualOffsetSeqnos.add(parseAndAssertOffsetForSingleTask(record)));
+
+ assertEquals("Committed offsets should match connector-defined transaction boundaries",
+ expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()));
+
+ List<Long> expectedRecordSeqnos = LongStream.range(1, recordsProduced + 1).boxed().collect(Collectors.toList());
+ long priorBoundary = 1;
+ long nextBoundary = 2;
+ while (priorBoundary < expectedRecordSeqnos.get(expectedRecordSeqnos.size() - 1)) {
+ if (nextBoundary % 2 == 0) {
+ for (long i = priorBoundary + 1; i < nextBoundary + 1; i++) {
+ expectedRecordSeqnos.remove(i);
+ }
+ }
+ nextBoundary += priorBoundary;
+ priorBoundary = nextBoundary - priorBoundary;
+ }
+ List<Long> actualRecordSeqnos = parseAndAssertValuesForSingleTask(sourceRecords);
+ // Have to sort the records by seqno since we produce to multiple partitions and in-order consumption isn't guaranteed
+ Collections.sort(actualRecordSeqnos);
+ assertEquals("Committed records should exclude connector-aborted transactions",
+ expectedRecordSeqnos, actualRecordSeqnos.subList(0, expectedRecordSeqnos.size()));
+ }
+
+ /**
+ * Brings up a one-node cluster, then intentionally fences out the transactional producer used by the leader
+ * for writes to the config topic to simulate a zombie leader being active in the cluster. The leader should
+ * automatically recover, verify that it is still the leader, and then succeed to create a connector when the
+ * user resends the request.
+ */
+ @Test
+ public void testFencedLeaderRecovery() throws Exception {
+ // Much slower offset commit interval; should never be triggered during this test
+ workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000");
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int numTasks = 1;
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around)
+ assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
+
+ // fence out the leader of the cluster
+ Producer<?, ?> zombieLeader = transactionalProducer(DistributedConfig.transactionalProducerId(CLUSTER_GROUP_ID));
+ zombieLeader.initTransactions();
+ zombieLeader.close();
+
+ // start a source connector--should fail the first time
+ assertThrows(ConnectRestException.class, () -> connect.configureConnector(CONNECTOR_NAME, props));
+
+ // the second request should succeed because the leader has reclaimed write privileges for the config topic
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true);
+ connect.deleteConnector(CONNECTOR_NAME);
+ assertConnectorStopped(connectorStop);
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> records = connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
+ records.count() >= recordsProduced);
+ assertExactlyOnceSeqnos(records, numTasks);
+ }
+
+ /**
+ * A moderately-complex green-path test that ensures the worker can start up and run tasks for a source
+ * connector that gets reconfigured, and will fence out potential zombie tasks for older generations before
+ * bringing up new task instances.
+ */
+ @Test
+ public void testConnectorReconfiguration() throws Exception {
+ // Much slower offset commit interval; should never be triggered during this test
+ workerProps.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "600000");
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ StartAndStopLatch connectorStart = connectorAndTaskStart(3);
+ props.put(TASKS_MAX_CONFIG, "3");
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+ assertConnectorStarted(connectorStart);
+
+ assertProducersAreFencedOnReconfiguration(3, 5, topic, props);
+ assertProducersAreFencedOnReconfiguration(5, 1, topic, props);
+ assertProducersAreFencedOnReconfiguration(1, 5, topic, props);
+ assertProducersAreFencedOnReconfiguration(5, 3, topic, props);
+
+ // Do a final sanity check to make sure that the last generation of tasks is able to run
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true);
+ connect.deleteConnector(CONNECTOR_NAME);
+ assertConnectorStopped(connectorStop);
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> records = connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
+ records.count() >= recordsProduced);
+ // We used at most five tasks during the tests; each of them should have been able to produce records
+ assertExactlyOnceSeqnos(records, 5);
+ }
+
+ /**
+ * This test ensures that tasks are marked failed in the status API when the round of
+ * zombie fencing that takes place before they are brought up fails. In addition, once
+ * the issue with the connector config that made fencing impossible is rectified, tasks
+ * can be successfully restarted.
+ * <p>
+ * Fencing failures are induced by bringing up an ACL-secured Kafka cluster and creating
+ * a connector whose principal is not authorized to access the transactional IDs that Connect
+ * uses for its tasks.
+ * <p>
+ * When the connector is initially brought up, no fencing is necessary. However, once it is
+ * reconfigured and generates new task configs, a round of zombie fencing is triggered,
+ * and all of its tasks fail when that round of zombie fencing fails.
+ * <p>
+ * After, the connector's principal is granted access to the necessary transactional IDs,
+ * all of its tasks are restarted, and we verify that they are able to come up successfully
+ * this time.
+ */
+ @Test
+ public void testTasksFailOnInabilityToFence() throws Exception {
+ brokerProps.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
+ brokerProps.put("sasl.enabled.mechanisms", "PLAIN");
+ brokerProps.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
+ brokerProps.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
+ brokerProps.put("listeners", "SASL_PLAINTEXT://localhost:0");
+ brokerProps.put("listener.name.sasl_plaintext.plain.sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ + "username=\"super\" "
+ + "password=\"super_pwd\" "
+ + "user_connector=\"connector_pwd\" "
+ + "user_super=\"super_pwd\";");
+ brokerProps.put("super.users", "User:super");
+
+ Map<String, String> superUserClientConfig = new HashMap<>();
+ superUserClientConfig.put("sasl.mechanism", "PLAIN");
+ superUserClientConfig.put("security.protocol", "SASL_PLAINTEXT");
+ superUserClientConfig.put("sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ + "username=\"super\" "
+ + "password=\"super_pwd\";");
+ // Give the worker super-user privileges
+ workerProps.putAll(superUserClientConfig);
+
+ final String globalOffsetsTopic = "connect-worker-offsets-topic";
+ workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic);
+
+ startConnect();
+
+ String topic = "test-topic";
+ Admin admin = connect.kafka().createAdminClient(Utils.mkProperties(superUserClientConfig));
+ admin.createTopics(Collections.singleton(new NewTopic(topic, 3, (short) 1))).all().get();
+
+ Map<String, String> props = new HashMap<>();
+ int tasksMax = 2; // Use two tasks since single-task connectors don't require zombie fencing
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax));
+ // Give the connectors' consumer and producer super-user privileges
+ superUserClientConfig.forEach((property, value) -> {
+ props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + property, value);
+ props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + property, value);
+ });
+ // But limit its admin client's privileges
+ props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "sasl.mechanism", "PLAIN");
+ props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "security.protocol", "SASL_PLAINTEXT");
+ props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + "sasl.jaas.config",
+ "org.apache.kafka.common.security.plain.PlainLoginModule required "
+ + "username=\"connector\" "
+ + "password=\"connector_pwd\";");
+ // Grant the connector's admin permissions to access the topics for its records and offsets
+ // Intentionally leave out permissions required for fencing
+ admin.createAcls(Arrays.asList(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL),
+ new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TOPIC, globalOffsetsTopic, PatternType.LITERAL),
+ new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
+ )
+ )).all().get();
+
+ StartAndStopLatch connectorStart = connectorAndTaskStart(tasksMax);
+
+ log.info("Bringing up connector with fresh slate; fencing should not be necessary");
+ connect.configureConnector(CONNECTOR_NAME, props);
+ assertConnectorStarted(connectorStart);
+ // Verify that the connector and its tasks have been able to start successfully
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have started successfully");
+
+ log.info("Reconfiguring connector; fencing should be necessary, and tasks should fail to start");
+ props.put("message.in.a.bottle", "19e184427ac45bd34c8588a4e771aa1a");
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ // Verify that the task has failed, and that the failure is visible to users via the REST API
+ connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, tasksMax, "Task should have failed on startup");
+
+ // Now grant the necessary permissions for fencing to the connector's admin
+ admin.createAcls(Arrays.asList(
+ new AclBinding(
+ new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 0), PatternType.LITERAL),
+ new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
+ ),
+ new AclBinding(
+ new ResourcePattern(ResourceType.TRANSACTIONAL_ID, Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, 1), PatternType.LITERAL),
+ new AccessControlEntry("User:connector", "*", AclOperation.ALL, AclPermissionType.ALLOW)
+ )
+ ));
+
+ log.info("Restarting connector after tweaking its ACLs; fencing should succeed this time");
+ connect.restartConnectorAndTasks(CONNECTOR_NAME, false, true, false);
+ // Verify that the connector and its tasks have been able to restart successfully
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully");
+ }
+
+ @Test
+ public void testSeparateOffsetsTopic() throws Exception {
+ final String globalOffsetsTopic = "connect-worker-offsets-topic";
+ workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, globalOffsetsTopic);
+
+ startConnect();
+ EmbeddedKafkaCluster connectorTargetedCluster = new EmbeddedKafkaCluster(1, brokerProps);
+ try (Closeable clusterShutdown = connectorTargetedCluster::stop) {
+ connectorTargetedCluster.start();
+ String topic = "test-topic";
+ connectorTargetedCluster.createTopic(topic, 3);
+
+ int numTasks = 1;
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, Integer.toString(numTasks));
+ props.put(TOPIC_CONFIG, topic);
+ props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
+ props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
+ props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
+ String offsetsTopic = CONNECTOR_NAME + "-offsets";
+ props.put(OFFSETS_TOPIC_CONFIG, offsetsTopic);
+
+ // expect all records to be consumed and committed by the connector
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ int recordNum = connectorTargetedCluster
+ .consume(
+ recordsProduced,
+ TimeUnit.MINUTES.toMillis(1),
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ "test-topic")
+ .count();
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + recordNum,
+ recordNum >= recordsProduced);
+
+ // also consume from the connector's dedicated offsets topic; just need to read one offset record
+ ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
+ .consume(
+ 1,
+ TimeUnit.MINUTES.toMillis(1),
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ offsetsTopic
+ ).iterator().next();
+ long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
+ assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced);
+
+ // also consume from the cluster's global offsets topic; again, just need to read one offset record
+ offsetRecord = connect.kafka()
+ .consume(
+ 1,
+ TimeUnit.MINUTES.toMillis(1),
+ globalOffsetsTopic
+ ).iterator().next();
+ seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
+ assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced);
+
+ // Shut down the whole cluster
+ connect.workers().forEach(connect::removeWorker);
+ // Reconfigure the cluster with exactly-once support disabled
+ workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
+
+ // Establish new expectations for records+offsets
+ connectorHandle.expectedRecords(recordsProduced);
+ connectorHandle.expectedCommits(recordsProduced);
+
+ // Restart the whole cluster
+ for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
+ connect.addWorker();
+ }
+
+ // And perform a basic sanity check that the cluster is able to come back up, our connector and its task are able to resume running,
+ // and the task is still able to produce source records and commit offsets
+ connect.assertions().assertAtLeastNumWorkersAreUp(DEFAULT_NUM_WORKERS, "cluster did not restart in time");
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME,
+ 1,
+ "connector and tasks did not resume running after cluster restart in time"
+ );
+
+ log.info("Waiting for records to be provided to worker by task");
+ // wait for the connector tasks to produce enough records
+ connectorHandle.awaitRecords(SOURCE_TASK_PRODUCE_TIMEOUT_MS);
+
+ log.info("Waiting for records to be committed to Kafka by worker");
+ // wait for the connector tasks to commit enough records
+ connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
+
+ StartAndStopLatch connectorStop = connectorHandle.expectedStops(1, true);
+ connect.deleteConnector(CONNECTOR_NAME);
+ assertConnectorStopped(connectorStop);
+
+ // consume all records from the source topic or fail, to ensure that they were correctly produced
+ ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
+ records.count() >= recordsProduced);
+ assertExactlyOnceSeqnos(records, numTasks);
+ }
+ }
+
+ /**
+ * A simple test to ensure that source tasks fail when trying to produce to their own offsets topic.
+ * <p>
+ * We fail the tasks in order to prevent deadlock that occurs when:
+ * <ol>
+ * <li>
+ * A task provides a record whose topic is the task's offsets topic
+ * </li>
+ * <li>
+ * That record is dispatched to the task's producer in a transaction that remains open
+ * at least until the worker polls the task again
+ * </li>
+ * <li>
+ * In the subsequent call to SourceTask::poll, the task requests offsets from the worker
+ * (which requires a read to the end of the offsets topic, and will block until any open
+ * transactions on the topic are either committed or aborted)
+ * </li>
+ * </ol>
+ */
+ @Test
+ public void testPotentialDeadlockWhenProducingToOffsetsTopic() throws Exception {
+ connectBuilder.numWorkers(1);
+ startConnect();
+
+ String topic = "test-topic";
+ connect.kafka().createTopic(topic, 3);
+
+ int recordsProduced = 100;
+
+ Map<String, String> props = new HashMap<>();
+ // See below; this connector does nothing except request offsets from the worker in SourceTask::poll
+ // and then return a single record targeted at its offsets topic
+ props.put(CONNECTOR_CLASS_CONFIG, NaughtyConnector.class.getName());
+ props.put(TASKS_MAX_CONFIG, "1");
+ props.put(NAME_CONFIG, CONNECTOR_NAME);
+ props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
+ props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(OFFSETS_TOPIC_CONFIG, "whoops");
+
+ // start a source connector
+ connect.configureConnector(CONNECTOR_NAME, props);
+
+ connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(
+ CONNECTOR_NAME, 1, "Task should have failed after trying to produce to its own offsets topic");
+ }
+
+ private ConfigInfo findConfigInfo(String property, ConfigInfos validationResult) {
+ return validationResult.values().stream()
+ .filter(info -> property.equals(info.configKey().name()))
+ .findAny()
+ .orElseThrow(() -> new AssertionError("Failed to find configuration validation result for property '" + property + "'"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private long parseAndAssertOffsetForSingleTask(ConsumerRecord<byte[], byte[]> offsetRecord) {
+ JsonConverter offsetsConverter = new JsonConverter();
+ // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use
+ // separate converter instances.
+
+ offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false);
+ Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
+ Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
+
+ assertNotNull("Offset value should not be null", valueObject);
+
+ assertEquals("Serialized source partition should match expected format",
+ Arrays.asList(CONNECTOR_NAME, MonitorableSourceConnector.sourcePartition(MonitorableSourceConnector.taskId(CONNECTOR_NAME, 0))),
+ keyObject);
+
+ Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
+
+ Object seqnoObject = value.get("saved");
+ assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
+ return assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+ }
+
+ private List<Long> parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]> sourceRecords) {
+ Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+ assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedValues.keySet());
+ return parsedValues.get(0);
+ }
+
+ private void assertExactlyOnceSeqnos(ConsumerRecords<byte[], byte[]> sourceRecords, int numTasks) {
+ Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+ Set<Integer> expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet());
+ assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet());
+
+ parsedValues.forEach((taskId, seqnos) -> {
+ // We don't check for order here because the records may have been produced to multiple topic partitions,
+ // which makes in-order consumption impossible
+ Set<Long> expectedSeqnos = LongStream.range(1, seqnos.size() + 1).boxed().collect(Collectors.toSet());
+ Set<Long> actualSeqnos = new HashSet<>(seqnos);
+ assertEquals(
+ "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record",
+ expectedSeqnos,
+ actualSeqnos
+ );
+ });
+ }
+
+ private Map<Integer, List<Long>> parseValuesForTasks(ConsumerRecords<byte[], byte[]> sourceRecords) {
+ Map<Integer, List<Long>> result = new HashMap<>();
+ for (ConsumerRecord<byte[], byte[]> sourceRecord : sourceRecords) {
+ assertNotNull("Record key should not be null", sourceRecord.key());
+ assertNotNull("Record value should not be null", sourceRecord.value());
+
+ String key = new String(sourceRecord.key());
+ String value = new String(sourceRecord.value());
+
+ String keyPrefix = "key-";
+ String valuePrefix = "value-";
+
+ assertTrue("Key should start with \"" + keyPrefix + "\"", key.startsWith(keyPrefix));
+ assertTrue("Value should start with \"" + valuePrefix + "\"", value.startsWith(valuePrefix));
+ assertEquals(
+ "key and value should be identical after prefix",
+ key.substring(keyPrefix.length()),
+ value.substring(valuePrefix.length())
+ );
+
+ String[] split = key.substring(keyPrefix.length()).split("-");
+ assertEquals("Key should match pattern 'key-<connectorName>-<taskId>-<seqno>", 3, split.length);
+ assertEquals("Key should match pattern 'key-<connectorName>-<taskId>-<seqno>", CONNECTOR_NAME, split[0]);
+
+ int taskId;
+ try {
+ taskId = Integer.parseInt(split[1], 10);
+ } catch (NumberFormatException e) {
+ throw new AssertionError("Task ID in key should be an integer, was '" + split[1] + "'", e);
+ }
+
+ long seqno;
+ try {
+ seqno = Long.parseLong(split[2], 10);
+ } catch (NumberFormatException e) {
+ throw new AssertionError("Seqno in key should be a long, was '" + split[2] + "'", e);
+ }
+
+ result.computeIfAbsent(taskId, t -> new ArrayList<>()).add(seqno);
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> T assertAndCast(Object o, Class<T> klass, String objectDescription) {
+ String className = o == null ? "null" : o.getClass().getName();
+ assertTrue(objectDescription + " should be " + klass.getName() + "; was " + className + " instead", klass.isInstance(o));
+ return (T) o;
+ }
+
+ /**
+ * Clear all existing task handles for the connector, then preemptively create {@code numTasks} many task handles for it,
+ * and return a {@link StartAndStopLatch} that can be used to {@link StartAndStopLatch#await(long, TimeUnit) await}
+ * the startup of that connector and the expected number of tasks.
+ * @param numTasks the number of tasks that should be started
+ * @return a {@link StartAndStopLatch} that will block until the connector and the expected number of tasks have started
+ */
+ private StartAndStopLatch connectorAndTaskStart(int numTasks) {
+ connectorHandle.clearTasks();
+ IntStream.range(0, numTasks)
+ .mapToObj(i -> MonitorableSourceConnector.taskId(CONNECTOR_NAME, i))
+ .forEach(connectorHandle::taskHandle);
+ return connectorHandle.expectedStarts(1, true);
+ }
+
+ private void assertConnectorStarted(StartAndStopLatch connectorStart) throws InterruptedException {
+ assertTrue("Connector and tasks did not finish startup in time",
+ connectorStart.await(
+ EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS,
+ TimeUnit.MILLISECONDS
+ )
+ );
+ }
+
+ private void assertConnectorStopped(StartAndStopLatch connectorStop) throws InterruptedException {
+ assertTrue(
+ "Connector and tasks did not finish shutdown in time",
+ connectorStop.await(
+ EmbeddedConnectClusterAssertions.CONNECTOR_SHUTDOWN_DURATION_MS,
+ TimeUnit.MILLISECONDS
+ )
+ );
+ }
+
+ private void assertProducersAreFencedOnReconfiguration(
+ int currentNumTasks,
+ int newNumTasks,
+ String topic,
+ Map<String, String> baseConnectorProps) throws InterruptedException {
+
+ // create a collection of producers that simulate the producers used for the existing tasks
+ List<KafkaProducer<byte[], byte[]>> producers = IntStream.range(0, currentNumTasks)
+ .mapToObj(i -> Worker.taskTransactionalId(CLUSTER_GROUP_ID, CONNECTOR_NAME, i))
+ .map(this::transactionalProducer)
+ .collect(Collectors.toList());
+
+ producers.forEach(KafkaProducer::initTransactions);
+
+ // reconfigure the connector with a new number of tasks
+ StartAndStopLatch connectorStart = connectorAndTaskStart(newNumTasks);
+ baseConnectorProps.put(TASKS_MAX_CONFIG, Integer.toString(newNumTasks));
+ log.info("Reconfiguring connector from {} tasks to {}", currentNumTasks, newNumTasks);
+ connect.configureConnector(CONNECTOR_NAME, baseConnectorProps);
+ assertConnectorStarted(connectorStart);
+
+ // validate that the old producers were fenced out
+ producers.forEach(producer -> assertTransactionalProducerIsFenced(producer, topic));
+ }
+
+ private KafkaProducer<byte[], byte[]> transactionalProducer(String transactionalId) {
+ Map<String, Object> transactionalProducerProps = new HashMap<>();
+ transactionalProducerProps.put(ENABLE_IDEMPOTENCE_CONFIG, true);
+ transactionalProducerProps.put(TRANSACTIONAL_ID_CONFIG, transactionalId);
+ return connect.kafka().createProducer(transactionalProducerProps);
+ }
+
+ private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], byte[]> producer, String topic) {
+ producer.beginTransaction();
+ assertThrows("Producer should be fenced out",
+ ProducerFencedException.class,
+ () -> {
+ producer.send(new ProducerRecord<>(topic, new byte[] {69}, new byte[] {96}));
+ producer.commitTransaction();
+ }
+ );
+ producer.close(Duration.ZERO);
+ }
+
+ public static class NaughtyConnector extends SourceConnector {
+ private Map<String, String> props;
+
+ @Override
+ public void start(Map<String, String> props) {
+ this.props = props;
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return NaughtyTask.class;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return IntStream.range(0, maxTasks).mapToObj(i -> props).collect(Collectors.toList());
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef();
+ }
+
+ @Override
+ public String version() {
+ return "none";
+ }
+ }
+
+ public static class NaughtyTask extends SourceTask {
+ private String topic;
+
+ @Override
+ public void start(Map<String, String> props) {
+ if (!props.containsKey(OFFSETS_TOPIC_CONFIG)) {
+ throw new ConnectException("No offsets topic");
+ }
+ this.topic = props.get(OFFSETS_TOPIC_CONFIG);
+ }
+
+ @Override
+ public List<SourceRecord> poll() {
+ // Request a read to the end of the offsets topic
+ context.offsetStorageReader().offset(Collections.singletonMap("", null));
+ // Produce a record to the offsets topic
+ return Collections.singletonList(new SourceRecord(null, null, topic, null, "", null, null));
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public String version() {
+ return "none";
+ }
+ }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 4f13ad08a2d..c2820315d6b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -20,8 +20,11 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.SampleSourceConnector;
+import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
+import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.tools.ThroughputThrottler;
@@ -32,6 +35,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@@ -47,6 +51,20 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
public static final String TOPIC_CONFIG = "topic";
+ public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
+
+ public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = "custom.exactly.once.support";
+ public static final String EXACTLY_ONCE_SUPPORTED = "supported";
+ public static final String EXACTLY_ONCE_UNSUPPORTED = "unsupported";
+ public static final String EXACTLY_ONCE_NULL = "null";
+ public static final String EXACTLY_ONCE_FAIL = "fail";
+
+ public static final String CUSTOM_TRANSACTION_BOUNDARIES_CONFIG = "custom.transaction.boundaries";
+ public static final String TRANSACTION_BOUNDARIES_SUPPORTED = "supported";
+ public static final String TRANSACTION_BOUNDARIES_UNSUPPORTED = "unsupported";
+ public static final String TRANSACTION_BOUNDARIES_NULL = "null";
+ public static final String TRANSACTION_BOUNDARIES_FAIL = "fail";
+
private String connectorName;
private ConnectorHandle connectorHandle;
private Map<String, String> commonConfigs;
@@ -74,7 +92,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>(commonConfigs);
config.put("connector.name", connectorName);
- config.put("task.id", connectorName + "-" + i);
+ config.put("task.id", taskId(connectorName, i));
configs.add(config);
}
return configs;
@@ -92,18 +110,55 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
return new ConfigDef();
}
+ @Override
+ public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> connectorConfig) {
+ String supportLevel = connectorConfig.getOrDefault(CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG, "null").toLowerCase(Locale.ROOT);
+ switch (supportLevel) {
+ case EXACTLY_ONCE_SUPPORTED:
+ return ExactlyOnceSupport.SUPPORTED;
+ case EXACTLY_ONCE_UNSUPPORTED:
+ return ExactlyOnceSupport.UNSUPPORTED;
+ case EXACTLY_ONCE_FAIL:
+ throw new ConnectException("oops");
+ default:
+ case EXACTLY_ONCE_NULL:
+ return null;
+ }
+ }
+
+ @Override
+ public ConnectorTransactionBoundaries canDefineTransactionBoundaries(Map<String, String> connectorConfig) {
+ String supportLevel = connectorConfig.getOrDefault(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, TRANSACTION_BOUNDARIES_UNSUPPORTED).toLowerCase(Locale.ROOT);
+ switch (supportLevel) {
+ case TRANSACTION_BOUNDARIES_SUPPORTED:
+ return ConnectorTransactionBoundaries.SUPPORTED;
+ case TRANSACTION_BOUNDARIES_FAIL:
+ throw new ConnectException("oh no :(");
+ case TRANSACTION_BOUNDARIES_NULL:
+ return null;
+ default:
+ case TRANSACTION_BOUNDARIES_UNSUPPORTED:
+ return ConnectorTransactionBoundaries.UNSUPPORTED;
+ }
+ }
+
+ public static String taskId(String connectorName, int taskId) {
+ return connectorName + "-" + taskId;
+ }
+
public static class MonitorableSourceTask extends SourceTask {
- private String connectorName;
private String taskId;
private String topicName;
private TaskHandle taskHandle;
private volatile boolean stopped;
private long startingSeqno;
private long seqno;
- private long throughput;
private int batchSize;
private ThroughputThrottler throttler;
+ private long priorTransactionBoundary;
+ private long nextTransactionBoundary;
+
@Override
public String version() {
return "unknown";
@@ -112,21 +167,24 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
@Override
public void start(Map<String, String> props) {
taskId = props.get("task.id");
- connectorName = props.get("connector.name");
+ String connectorName = props.get("connector.name");
topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
- throughput = Long.parseLong(props.getOrDefault("throughput", "-1"));
- batchSize = Integer.parseInt(props.getOrDefault("messages.per.poll", "1"));
+ batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
Map<String, Object> offset = Optional.ofNullable(
context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
+ seqno = startingSeqno;
log.info("Started {} task {} with properties {}", this.getClass().getSimpleName(), taskId, props);
- throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
+ throttler = new ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")), System.currentTimeMillis());
taskHandle.recordTaskStart();
+ priorTransactionBoundary = 0;
+ nextTransactionBoundary = 1;
if (Boolean.parseBoolean(props.getOrDefault("task-" + taskId + ".start.inject.error", "false"))) {
throw new RuntimeException("Injecting errors during task start");
}
+ calculateNextBoundary();
}
@Override
@@ -136,19 +194,24 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
throttler.throttle();
}
taskHandle.record(batchSize);
- log.info("Returning batch of {} records", batchSize);
+ log.trace("Returning batch of {} records", batchSize);
return LongStream.range(0, batchSize)
- .mapToObj(i -> new SourceRecord(
- Collections.singletonMap("task.id", taskId),
- Collections.singletonMap("saved", ++seqno),
- topicName,
- null,
- Schema.STRING_SCHEMA,
- "key-" + taskId + "-" + seqno,
- Schema.STRING_SCHEMA,
- "value-" + taskId + "-" + seqno,
- null,
- new ConnectHeaders().addLong("header-" + seqno, seqno)))
+ .mapToObj(i -> {
+ seqno++;
+ SourceRecord record = new SourceRecord(
+ sourcePartition(taskId),
+ sourceOffset(seqno),
+ topicName,
+ null,
+ Schema.STRING_SCHEMA,
+ "key-" + taskId + "-" + seqno,
+ Schema.STRING_SCHEMA,
+ "value-" + taskId + "-" + seqno,
+ null,
+ new ConnectHeaders().addLong("header-" + seqno, seqno));
+ maybeDefineTransactionBoundary(record);
+ return record;
+ })
.collect(Collectors.toList());
}
return null;
@@ -172,5 +235,43 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
stopped = true;
taskHandle.recordTaskStop();
}
+
+ /**
+ * Calculate the next transaction boundary, i.e., the seqno whose corresponding source record should be used to
+ * either {@link org.apache.kafka.connect.source.TransactionContext#commitTransaction(SourceRecord) commit}
+ * or {@link org.apache.kafka.connect.source.TransactionContext#abortTransaction(SourceRecord) abort} the next transaction.
+ * <p>
+ * This connector defines transactions whose size correspond to successive elements of the Fibonacci sequence,
+ * where transactions with an even number of records are aborted, and those with an odd number of records are committed.
+ */
+ private void calculateNextBoundary() {
+ while (nextTransactionBoundary <= seqno) {
+ nextTransactionBoundary += priorTransactionBoundary;
+ priorTransactionBoundary = nextTransactionBoundary - priorTransactionBoundary;
+ }
+ }
+
+ private void maybeDefineTransactionBoundary(SourceRecord record) {
+ if (context.transactionContext() == null || seqno != nextTransactionBoundary) {
+ return;
+ }
+ // If the transaction boundary ends on an even-numbered offset, abort it
+ // Otherwise, commit
+ boolean abort = nextTransactionBoundary % 2 == 0;
+ calculateNextBoundary();
+ if (abort) {
+ context.transactionContext().abortTransaction(record);
+ } else {
+ context.transactionContext().commitTransaction(record);
+ }
+ }
+ }
+
+ public static Map<String, Object> sourcePartition(String taskId) {
+ return Collections.singletonMap("task.id", taskId);
+ }
+
+ public static Map<String, Object> sourceOffset(long seqno) {
+ return Collections.singletonMap("saved", seqno);
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index 44b12eb6e97..c026cb72903 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -47,6 +47,9 @@ public class EmbeddedConnectClusterAssertions {
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5);
public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2);
+ // Creating a connector requires two rounds of rebalance; destroying one only requires one
+ // Assume it'll take ~half the time to destroy a connector as it does to create one
+ public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private final EmbeddedConnectCluster connect;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index f1a63a4615c..5bbbc684c2e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -26,14 +26,18 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
@@ -45,6 +49,7 @@ import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.metadata.BrokerState;
import org.slf4j.Logger;
@@ -55,9 +60,11 @@ import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -66,6 +73,8 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -75,6 +84,9 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMI
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static org.junit.Assert.assertFalse;
/**
* Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
@@ -439,9 +451,23 @@ public class EmbeddedKafkaCluster {
* @return a {@link ConsumerRecords} collection containing at least n records.
*/
public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, String... topics) {
+ return consume(n, maxDuration, Collections.emptyMap(), topics);
+ }
+
+ /**
+ * Consume at least n records in a given duration or throw an exception.
+ *
+ * @param n the number of expected records in this topic.
+ * @param maxDuration the max duration to wait for these records (in milliseconds).
+ * @param topics the topics to subscribe and consume records from.
+ * @param consumerProps overrides to the default properties the consumer is constructed with;
+ * may not be null
+ * @return a {@link ConsumerRecords} collection containing at least n records.
+ */
+ public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, Map<String, Object> consumerProps, String... topics) {
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
int consumedRecords = 0;
- try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) {
+ try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(consumerProps, topics)) {
final long startMillis = System.currentTimeMillis();
long allowedDuration = maxDuration;
while (allowedDuration > 0) {
@@ -466,6 +492,108 @@ public class EmbeddedKafkaCluster {
throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n);
}
+ /**
+ * Consume all currently-available records for the specified topics in a given duration, or throw an exception.
+ * @param maxDurationMs the max duration to wait for these records (in milliseconds).
+ * @param consumerProps overrides to the default properties the consumer is constructed with; may be null
+ * @param adminProps overrides to the default properties the admin used to query Kafka cluster metadata is constructed with; may be null
+ * @param topics the topics to consume from
+ * @return a {@link ConsumerRecords} collection containing the records for all partitions of the given topics
+ */
+ public ConsumerRecords<byte[], byte[]> consumeAll(
+ long maxDurationMs,
+ Map<String, Object> consumerProps,
+ Map<String, Object> adminProps,
+ String... topics
+ ) throws TimeoutException, InterruptedException, ExecutionException {
+ long endTimeMs = System.currentTimeMillis() + maxDurationMs;
+
+ Consumer<byte[], byte[]> consumer = createConsumer(consumerProps != null ? consumerProps : Collections.emptyMap());
+ Admin admin = createAdminClient(Utils.mkObjectProperties(adminProps != null ? adminProps : Collections.emptyMap()));
+
+ long remainingTimeMs = endTimeMs - System.currentTimeMillis();
+ Set<TopicPartition> topicPartitions = listPartitions(remainingTimeMs, admin, Arrays.asList(topics));
+
+ remainingTimeMs = endTimeMs - System.currentTimeMillis();
+ Map<TopicPartition, Long> endOffsets = readEndOffsets(remainingTimeMs, admin, topicPartitions);
+
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = topicPartitions.stream()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ tp -> new ArrayList<>()
+ ));
+ consumer.assign(topicPartitions);
+
+ while (!endOffsets.isEmpty()) {
+ Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<TopicPartition, Long> entry = it.next();
+ TopicPartition topicPartition = entry.getKey();
+ long endOffset = entry.getValue();
+ long lastConsumedOffset = consumer.position(topicPartition);
+ if (lastConsumedOffset >= endOffset) {
+ // We've reached the end offset for the topic partition; can stop polling it now
+ it.remove();
+ } else {
+ remainingTimeMs = endTimeMs - System.currentTimeMillis();
+ if (remainingTimeMs <= 0) {
+ throw new AssertionError("failed to read to end of topic(s) " + Arrays.asList(topics) + " within " + maxDurationMs + "ms");
+ }
+ // We haven't reached the end offset yet; need to keep polling
+ ConsumerRecords<byte[], byte[]> recordBatch = consumer.poll(Duration.ofMillis(remainingTimeMs));
+ recordBatch.partitions().forEach(tp -> records.get(tp)
+ .addAll(recordBatch.records(tp))
+ );
+ }
+ }
+ }
+
+ return new ConsumerRecords<>(records);
+ }
+
+ /**
+ * List all the known partitions for the given {@link Collection} of topics
+ * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds).
+ * @param admin the admin client to use for fetching metadata from the Kafka cluster
+ * @param topics the topics whose partitions should be listed
+ * @return a {@link Set} of {@link TopicPartition topic partitions} for the given topics; never null, and never empty
+ */
+ private Set<TopicPartition> listPartitions(
+ long maxDurationMs,
+ Admin admin,
+ Collection<String> topics
+ ) throws TimeoutException, InterruptedException, ExecutionException {
+ assertFalse("collection of topics may not be empty", topics.isEmpty());
+ return admin.describeTopics(topics)
+ .allTopicNames().get(maxDurationMs, TimeUnit.MILLISECONDS)
+ .entrySet().stream()
+ .flatMap(e -> e.getValue().partitions().stream().map(p -> new TopicPartition(e.getKey(), p.partition())))
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * List the latest current offsets for the given {@link Collection} of {@link TopicPartition topic partitions}
+ * @param maxDurationMs the max duration to wait for while fetching metadata from Kafka (in milliseconds)
+ * @param admin the admin client to use for fetching metadata from the Kafka cluster
+ * @param topicPartitions the topic partitions to list end offsets for
+ * @return a {@link Map} containing the latest offset for each requested {@link TopicPartition topic partition}; never null, and never empty
+ */
+ private Map<TopicPartition, Long> readEndOffsets(
+ long maxDurationMs,
+ Admin admin,
+ Collection<TopicPartition> topicPartitions
+ ) throws TimeoutException, InterruptedException, ExecutionException {
+ assertFalse("collection of topic partitions may not be empty", topicPartitions.isEmpty());
+ Map<TopicPartition, OffsetSpec> offsetSpecMap = topicPartitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()));
+ return admin.listOffsets(offsetSpecMap, new ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED))
+ .all().get(maxDurationMs, TimeUnit.MILLISECONDS)
+ .entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().offset()
+ ));
+ }
+
public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> consumerProps) {
Map<String, Object> props = new HashMap<>(consumerProps);
@@ -495,6 +623,26 @@ public class EmbeddedKafkaCluster {
return consumer;
}
+ public KafkaProducer<byte[], byte[]> createProducer(Map<String, Object> producerProps) {
+ Map<String, Object> props = new HashMap<>(producerProps);
+
+ putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ putIfAbsent(props, KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ putIfAbsent(props, VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ if (sslEnabled()) {
+ putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+ putIfAbsent(props, SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
+ putIfAbsent(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+ }
+ KafkaProducer<byte[], byte[]> producer;
+ try {
+ producer = new KafkaProducer<>(props);
+ } catch (Throwable t) {
+ throw new ConnectException("Failed to create producer", t);
+ }
+ return producer;
+ }
+
private static void putIfAbsent(final Map<String, Object> props, final String propertyKey, final Object propertyValue) {
if (!props.containsKey(propertyKey)) {
props.put(propertyKey, propertyValue);