You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/07/28 15:18:19 UTC
[kafka] branch trunk updated: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9e74f91e56 KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)
9e74f91e56 is described below
commit 9e74f91e56dbc06f17c95fe80dd3923f7b713457
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Thu Jul 28 11:18:09 2022 -0400
KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)
Reviewers: Mickael Maison <mi...@gmail.com>
, Tom Bentley <tb...@redhat.com>
---
.../runtime/ExactlyOnceWorkerSourceTask.java | 3 +
.../org/apache/kafka/connect/runtime/Worker.java | 13 ++
.../ExactlyOnceSourceIntegrationTest.java | 192 +++++++++++++++------
.../integration/MonitorableSourceConnector.java | 2 +-
4 files changed, 155 insertions(+), 55 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index de78e592aa..931917b9e1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -217,6 +217,9 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
if (failed) {
log.debug("Skipping final offset commit as task has failed");
return;
+ } else if (isCancelled()) {
+ log.debug("Skipping final offset commit as task has been cancelled");
+ return;
}
// It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 16e48d8f17..5bc67693d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -756,6 +756,19 @@ public class Worker {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
String clusterId) {
Map<String, Object> result = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, clusterId);
+ // The base producer properties forcibly disable idempotence; remove it from those properties
+ // if not explicitly requested by the user
+ boolean connectorProducerIdempotenceConfigured = connConfig.originals().containsKey(
+ ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+ );
+ if (!connectorProducerIdempotenceConfigured) {
+ boolean workerProducerIdempotenceConfigured = config.originals().containsKey(
+ "producer." + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+ );
+ if (!workerProducerIdempotenceConfigured) {
+ result.remove(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+ }
+ }
ConnectUtils.ensureProperty(
result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
"for connectors when exactly-once source support is enabled",
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 90fcaa8a21..bd9bceba06 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -407,12 +407,12 @@ public class ExactlyOnceSourceIntegrationTest {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// consume all records from the source topic or fail, to ensure that they were correctly produced
- ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka()
- .consume(
- recordsProduced,
- TimeUnit.MINUTES.toMillis(1),
- consumerProps,
- "test-topic");
+ ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ topic
+ );
assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
sourceRecords.count() >= recordsProduced);
@@ -434,8 +434,7 @@ public class ExactlyOnceSourceIntegrationTest {
offsetsTopic
);
- List<Long> actualOffsetSeqnos = new ArrayList<>();
- offsetRecords.forEach(record -> actualOffsetSeqnos.add(parseAndAssertOffsetForSingleTask(record)));
+ List<Long> actualOffsetSeqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
assertEquals("Committed offsets should match connector-defined transaction boundaries",
expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()));
@@ -716,6 +715,20 @@ public class ExactlyOnceSourceIntegrationTest {
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully");
}
+ /**
+ * This test focuses extensively on the per-connector offsets feature.
+ * <p>
+ * First, a connector is brought up whose producer is configured to write to a different Kafka cluster
+ * than the one the Connect cluster users for its internal topics, then the contents of the connector's
+ * dedicated offsets topic and the worker's internal offsets topic are inspected to ensure that offsets
+ * have been backed up from the dedicated topic to the global topic.
+ * <p>
+ * Then, a "soft downgrade" is simulated: the Connect cluster is shut down and reconfigured to disable
+ * exactly-once support. The cluster is brought up again, the connector is allowed to produce some data,
+ * the connector is shut down, and this time, the records the connector has produced are inspected for
+ * accuracy. Because of the downgrade, exactly-once guarantees are lost, but we check to make sure that
+ * the task has maintained exactly-once delivery <i>up to the last-committed record</i>.
+ */
@Test
public void testSeparateOffsetsTopic() throws Exception {
final String globalOffsetsTopic = "connect-worker-offsets-topic";
@@ -761,7 +774,7 @@ public class ExactlyOnceSourceIntegrationTest {
// wait for the connector tasks to commit enough records
connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
- // consume all records from the source topic or fail, to ensure that they were correctly produced
+ // consume at least the expected number of records from the source topic or fail, to ensure that they were correctly produced
int recordNum = connectorTargetedCluster
.consume(
recordsProduced,
@@ -772,28 +785,33 @@ public class ExactlyOnceSourceIntegrationTest {
assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + recordNum,
recordNum >= recordsProduced);
- // also consume from the connector's dedicated offsets topic; just need to read one offset record
- ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
- .consume(
- 1,
+ // also consume from the connector's dedicated offsets topic
+ ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
+ .consumeAll(
TimeUnit.MINUTES.toMillis(1),
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
offsetsTopic
- ).iterator().next();
- long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
- assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced);
+ );
+ List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+ seqnos.forEach(seqno ->
+ assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced)
+ );
- // also consume from the cluster's global offsets topic; again, just need to read one offset record
- offsetRecord = connect.kafka()
- .consume(
- 1,
+ // also consume from the cluster's global offsets topic
+ offsetRecords = connect.kafka()
+ .consumeAll(
TimeUnit.MINUTES.toMillis(1),
+ null,
+ null,
globalOffsetsTopic
- ).iterator().next();
- seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
- assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced);
+ );
+ seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+ seqnos.forEach(seqno ->
+ assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced)
+ );
// Shut down the whole cluster
connect.workers().forEach(connect::removeWorker);
@@ -831,15 +849,22 @@ public class ExactlyOnceSourceIntegrationTest {
assertConnectorStopped(connectorStop);
// consume all records from the source topic or fail, to ensure that they were correctly produced
- ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+ ConsumerRecords<byte[], byte[]> sourceRecords = connectorTargetedCluster.consumeAll(
CONSUME_RECORDS_TIMEOUT_MS,
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
- assertExactlyOnceSeqnos(records, numTasks);
+ assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+ sourceRecords.count() >= recordsProduced);
+ // also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees
+ offsetRecords = connectorTargetedCluster.consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+ Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+ null,
+ offsetsTopic
+ );
+ assertAtLeastOnceSeqnos(sourceRecords, offsetRecords, numTasks);
}
}
@@ -896,27 +921,10 @@ public class ExactlyOnceSourceIntegrationTest {
.orElseThrow(() -> new AssertionError("Failed to find configuration validation result for property '" + property + "'"));
}
- @SuppressWarnings("unchecked")
- private long parseAndAssertOffsetForSingleTask(ConsumerRecord<byte[], byte[]> offsetRecord) {
- JsonConverter offsetsConverter = new JsonConverter();
- // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use
- // separate converter instances.
-
- offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false);
- Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
- Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
-
- assertNotNull("Offset value should not be null", valueObject);
-
- assertEquals("Serialized source partition should match expected format",
- Arrays.asList(CONNECTOR_NAME, MonitorableSourceConnector.sourcePartition(MonitorableSourceConnector.taskId(CONNECTOR_NAME, 0))),
- keyObject);
-
- Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
-
- Object seqnoObject = value.get("saved");
- assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
- return assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+ private List<Long> parseAndAssertOffsetsForSingleTask(ConsumerRecords<byte[], byte[]> offsetRecords) {
+ Map<Integer, List<Long>> parsedOffsets = parseOffsetForTasks(offsetRecords);
+ assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedOffsets.keySet());
+ return parsedOffsets.get(0);
}
private List<Long> parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]> sourceRecords) {
@@ -927,6 +935,25 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertExactlyOnceSeqnos(ConsumerRecords<byte[], byte[]> sourceRecords, int numTasks) {
Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+ assertSeqnos(parsedValues, numTasks);
+ }
+
+ private void assertAtLeastOnceSeqnos(ConsumerRecords<byte[], byte[]> sourceRecords, ConsumerRecords<byte[], byte[]> offsetRecords, int numTasks) {
+ Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+ Map<Integer, Long> lastCommittedValues = parseOffsetForTasks(offsetRecords)
+ .entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> Collections.max(e.getValue())
+ ));
+ parsedValues.replaceAll((task, values) -> {
+ Long committedValue = lastCommittedValues.get(task);
+ assertNotNull("No committed offset found for task " + task, committedValue);
+ return values.stream().filter(v -> v <= committedValue).collect(Collectors.toList());
+ });
+ assertSeqnos(parsedValues, numTasks);
+ }
+
+ private void assertSeqnos(Map<Integer, List<Long>> parsedValues, int numTasks) {
Set<Integer> expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet());
assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet());
@@ -935,10 +962,19 @@ public class ExactlyOnceSourceIntegrationTest {
// which makes in-order consumption impossible
Set<Long> expectedSeqnos = LongStream.range(1, seqnos.size() + 1).boxed().collect(Collectors.toSet());
Set<Long> actualSeqnos = new HashSet<>(seqnos);
- assertEquals(
- "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record",
- expectedSeqnos,
- actualSeqnos
+
+ Set<Long> missingSeqnos = new HashSet<>(expectedSeqnos);
+ missingSeqnos.removeAll(actualSeqnos);
+ Set<Long> extraSeqnos = new HashSet<>(actualSeqnos);
+ extraSeqnos.removeAll(expectedSeqnos);
+
+ // Try to provide the most friendly error message possible if this test fails
+ assertTrue(
+ "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record, " +
+ "but the actual seqnos did not.\n" +
+ "Seqnos that should have been emitted but were not: " + missingSeqnos + "\n" +
+ "seqnos that should not have been emitted but were: " + extraSeqnos,
+ missingSeqnos.isEmpty() && extraSeqnos.isEmpty()
);
});
}
@@ -986,6 +1022,54 @@ public class ExactlyOnceSourceIntegrationTest {
return result;
}
+ private Map<Integer, List<Long>> parseOffsetForTasks(ConsumerRecords<byte[], byte[]> offsetRecords) {
+ JsonConverter offsetsConverter = new JsonConverter();
+ // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use
+ // separate converter instances.
+ offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false);
+
+ Map<Integer, List<Long>> result = new HashMap<>();
+ for (ConsumerRecord<byte[], byte[]> offsetRecord : offsetRecords) {
+ Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
+ Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
+
+ assertNotNull("Offset key should not be null", keyObject);
+ assertNotNull("Offset value should not be null", valueObject);
+
+ @SuppressWarnings("unchecked")
+ List<Object> key = assertAndCast(keyObject, List.class, "Key");
+ assertEquals(
+ "Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition",
+ 2,
+ key.size()
+ );
+ assertEquals(CONNECTOR_NAME, key.get(0));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> partition = assertAndCast(key.get(1), Map.class, "Key[1]");
+ Object taskIdObject = partition.get("task.id");
+ assertNotNull("Serialized source partition should contain 'task.id' field from MonitorableSourceConnector", taskIdObject);
+ String taskId = assertAndCast(taskIdObject, String.class, "task ID");
+ assertTrue("task ID should match pattern '<connectorName>-<taskId>", taskId.startsWith(CONNECTOR_NAME + "-"));
+ String taskIdRemainder = taskId.substring(CONNECTOR_NAME.length() + 1);
+ int taskNum;
+ try {
+ taskNum = Integer.parseInt(taskIdRemainder);
+ } catch (NumberFormatException e) {
+ throw new AssertionError("task ID should match pattern '<connectorName>-<taskId>', where <taskId> is an integer", e);
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
+
+ Object seqnoObject = value.get("saved");
+ assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
+ long seqno = assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+
+ result.computeIfAbsent(taskNum, t -> new ArrayList<>()).add(seqno);
+ }
+ return result;
+ }
+
@SuppressWarnings("unchecked")
private static <T> T assertAndCast(Object o, Class<T> klass, String objectDescription) {
String className = o == null ? "null" : o.getClass().getName();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index c2820315d6..33ba1588a7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -172,7 +172,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
Map<String, Object> offset = Optional.ofNullable(
- context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
+ context.offsetStorageReader().offset(sourcePartition(taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
seqno = startingSeqno;