You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/07/28 15:18:19 UTC

[kafka] branch trunk updated: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e74f91e56 KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)
9e74f91e56 is described below

commit 9e74f91e56dbc06f17c95fe80dd3923f7b713457
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Thu Jul 28 11:18:09 2022 -0400

    KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test (#12429)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>

, Tom Bentley <tb...@redhat.com>
---
 .../runtime/ExactlyOnceWorkerSourceTask.java       |   3 +
 .../org/apache/kafka/connect/runtime/Worker.java   |  13 ++
 .../ExactlyOnceSourceIntegrationTest.java          | 192 +++++++++++++++------
 .../integration/MonitorableSourceConnector.java    |   2 +-
 4 files changed, 155 insertions(+), 55 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index de78e592aa..931917b9e1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -217,6 +217,9 @@ class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
+        } else if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been cancelled");
+            return;
         }
 
         // It should be safe to commit here even if we were in the middle of retrying on RetriableExceptions in the
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 16e48d8f17..5bc67693d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -756,6 +756,19 @@ public class Worker {
                                                               ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
                                                               String clusterId) {
         Map<String, Object> result = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, connConfig, connectorClass, connectorClientConfigOverridePolicy, clusterId);
+        // The base producer properties forcibly disable idempotence; remove it from those properties
+        // if not explicitly requested by the user
+        boolean connectorProducerIdempotenceConfigured = connConfig.originals().containsKey(
+                ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+        );
+        if (!connectorProducerIdempotenceConfigured) {
+            boolean workerProducerIdempotenceConfigured = config.originals().containsKey(
+                    "producer." + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+            );
+            if (!workerProducerIdempotenceConfigured) {
+                result.remove(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+            }
+        }
         ConnectUtils.ensureProperty(
                 result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
                 "for connectors when exactly-once source support is enabled",
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 90fcaa8a21..bd9bceba06 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -407,12 +407,12 @@ public class ExactlyOnceSourceIntegrationTest {
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
         // consume all records from the source topic or fail, to ensure that they were correctly produced
-        ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka()
-                .consume(
-                        recordsProduced,
-                        TimeUnit.MINUTES.toMillis(1),
-                        consumerProps,
-                        "test-topic");
+        ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka().consumeAll(
+                CONSUME_RECORDS_TIMEOUT_MS,
+                Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                null,
+                topic
+        );
         assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
                 sourceRecords.count() >= recordsProduced);
 
@@ -434,8 +434,7 @@ public class ExactlyOnceSourceIntegrationTest {
                         offsetsTopic
                 );
 
-        List<Long> actualOffsetSeqnos = new ArrayList<>();
-        offsetRecords.forEach(record -> actualOffsetSeqnos.add(parseAndAssertOffsetForSingleTask(record)));
+        List<Long> actualOffsetSeqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
 
         assertEquals("Committed offsets should match connector-defined transaction boundaries",
                 expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, expectedOffsetSeqnos.size()));
@@ -716,6 +715,20 @@ public class ExactlyOnceSourceIntegrationTest {
         connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, tasksMax, "Connector and task should have restarted successfully");
     }
 
+    /**
+     * This test focuses extensively on the per-connector offsets feature.
+     * <p>
+     * First, a connector is brought up whose producer is configured to write to a different Kafka cluster
+     * than the one the Connect cluster users for its internal topics, then the contents of the connector's
+     * dedicated offsets topic and the worker's internal offsets topic are inspected to ensure that offsets
+     * have been backed up from the dedicated topic to the global topic.
+     * <p>
+     * Then, a "soft downgrade" is simulated: the Connect cluster is shut down and reconfigured to disable
+     * exactly-once support. The cluster is brought up again, the connector is allowed to produce some data,
+     * the connector is shut down, and this time, the records the connector has produced are inspected for
+     * accuracy. Because of the downgrade, exactly-once guarantees are lost, but we check to make sure that
+     * the task has maintained exactly-once delivery <i>up to the last-committed record</i>.
+     */
     @Test
     public void testSeparateOffsetsTopic() throws Exception {
         final String globalOffsetsTopic = "connect-worker-offsets-topic";
@@ -761,7 +774,7 @@ public class ExactlyOnceSourceIntegrationTest {
             // wait for the connector tasks to commit enough records
             connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
 
-            // consume all records from the source topic or fail, to ensure that they were correctly produced
+            // consume at least the expected number of records from the source topic or fail, to ensure that they were correctly produced
             int recordNum = connectorTargetedCluster
                     .consume(
                             recordsProduced,
@@ -772,28 +785,33 @@ public class ExactlyOnceSourceIntegrationTest {
             assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + recordNum,
                     recordNum >= recordsProduced);
 
-            // also consume from the connector's dedicated offsets topic; just need to read one offset record
-            ConsumerRecord<byte[], byte[]> offsetRecord = connectorTargetedCluster
-                    .consume(
-                            1,
+            // also consume from the connector's dedicated offsets topic
+            ConsumerRecords<byte[], byte[]> offsetRecords = connectorTargetedCluster
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
                             Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                            null,
                             offsetsTopic
-                    ).iterator().next();
-            long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            List<Long> seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
-            // also consume from the cluster's global offsets topic; again, just need to read one offset record
-            offsetRecord = connect.kafka()
-                    .consume(
-                            1,
+            // also consume from the cluster's global offsets topic
+            offsetRecords = connect.kafka()
+                    .consumeAll(
                             TimeUnit.MINUTES.toMillis(1),
+                            null,
+                            null,
                             globalOffsetsTopic
-                    ).iterator().next();
-            seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
-            assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
-                    0, seqno % recordsProduced);
+                    );
+            seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+            seqnos.forEach(seqno ->
+                assertEquals("Offset commits should occur on connector-defined poll boundaries, which happen every " + recordsProduced + " records",
+                        0, seqno % recordsProduced)
+            );
 
             // Shut down the whole cluster
             connect.workers().forEach(connect::removeWorker);
@@ -831,15 +849,22 @@ public class ExactlyOnceSourceIntegrationTest {
             assertConnectorStopped(connectorStop);
 
             // consume all records from the source topic or fail, to ensure that they were correctly produced
-            ConsumerRecords<byte[], byte[]> records = connectorTargetedCluster.consumeAll(
+            ConsumerRecords<byte[], byte[]> sourceRecords = connectorTargetedCluster.consumeAll(
                     CONSUME_RECORDS_TIMEOUT_MS,
                     Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
                     null,
                     topic
             );
-            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + records.count(),
-                    records.count() >= recordsProduced);
-            assertExactlyOnceSeqnos(records, numTasks);
+            assertTrue("Not enough records produced by source connector. Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+                    sourceRecords.count() >= recordsProduced);
+            // also have to check which offsets have actually been committed, since we no longer have exactly-once guarantees
+            offsetRecords = connectorTargetedCluster.consumeAll(
+                    CONSUME_RECORDS_TIMEOUT_MS,
+                    Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"),
+                    null,
+                    offsetsTopic
+            );
+            assertAtLeastOnceSeqnos(sourceRecords, offsetRecords, numTasks);
         }
     }
 
@@ -896,27 +921,10 @@ public class ExactlyOnceSourceIntegrationTest {
                 .orElseThrow(() -> new AssertionError("Failed to find configuration validation result for property '" + property + "'"));
     }
 
-    @SuppressWarnings("unchecked")
-    private long parseAndAssertOffsetForSingleTask(ConsumerRecord<byte[], byte[]> offsetRecord) {
-        JsonConverter offsetsConverter = new JsonConverter();
-        // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use
-        // separate converter instances.
-
-        offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false);
-        Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
-        Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
-
-        assertNotNull("Offset value should not be null", valueObject);
-
-        assertEquals("Serialized source partition should match expected format",
-                Arrays.asList(CONNECTOR_NAME, MonitorableSourceConnector.sourcePartition(MonitorableSourceConnector.taskId(CONNECTOR_NAME, 0))),
-                keyObject);
-
-        Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
-
-        Object seqnoObject = value.get("saved");
-        assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
-        return assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+    private List<Long> parseAndAssertOffsetsForSingleTask(ConsumerRecords<byte[], byte[]> offsetRecords) {
+        Map<Integer, List<Long>> parsedOffsets = parseOffsetForTasks(offsetRecords);
+        assertEquals("Expected records to only be produced from a single task", Collections.singleton(0), parsedOffsets.keySet());
+        return parsedOffsets.get(0);
     }
 
     private List<Long> parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]> sourceRecords) {
@@ -927,6 +935,25 @@ public class ExactlyOnceSourceIntegrationTest {
 
     private void assertExactlyOnceSeqnos(ConsumerRecords<byte[], byte[]> sourceRecords, int numTasks) {
         Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+        assertSeqnos(parsedValues, numTasks);
+    }
+
+    private void assertAtLeastOnceSeqnos(ConsumerRecords<byte[], byte[]> sourceRecords, ConsumerRecords<byte[], byte[]> offsetRecords, int numTasks) {
+        Map<Integer, List<Long>> parsedValues = parseValuesForTasks(sourceRecords);
+        Map<Integer, Long> lastCommittedValues = parseOffsetForTasks(offsetRecords)
+                .entrySet().stream().collect(Collectors.toMap(
+                        Map.Entry::getKey,
+                        e -> Collections.max(e.getValue())
+                ));
+        parsedValues.replaceAll((task, values) -> {
+            Long committedValue = lastCommittedValues.get(task);
+            assertNotNull("No committed offset found for task " + task, committedValue);
+            return values.stream().filter(v -> v <= committedValue).collect(Collectors.toList());
+        });
+        assertSeqnos(parsedValues, numTasks);
+    }
+
+    private void assertSeqnos(Map<Integer, List<Long>> parsedValues, int numTasks) {
         Set<Integer> expectedKeys = IntStream.range(0, numTasks).boxed().collect(Collectors.toSet());
         assertEquals("Expected records to be produced by each task", expectedKeys, parsedValues.keySet());
 
@@ -935,10 +962,19 @@ public class ExactlyOnceSourceIntegrationTest {
             // which makes in-order consumption impossible
             Set<Long> expectedSeqnos = LongStream.range(1, seqnos.size() + 1).boxed().collect(Collectors.toSet());
             Set<Long> actualSeqnos = new HashSet<>(seqnos);
-            assertEquals(
-                    "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record",
-                    expectedSeqnos,
-                    actualSeqnos
+
+            Set<Long> missingSeqnos = new HashSet<>(expectedSeqnos);
+            missingSeqnos.removeAll(actualSeqnos);
+            Set<Long> extraSeqnos = new HashSet<>(actualSeqnos);
+            extraSeqnos.removeAll(expectedSeqnos);
+
+            // Try to provide the most friendly error message possible if this test fails
+            assertTrue(
+                    "Seqnos for task " + taskId + " should start at 1 and increase strictly by 1 with each record, " +
+                            "but the actual seqnos did not.\n" +
+                            "Seqnos that should have been emitted but were not: " + missingSeqnos + "\n" +
+                            "seqnos that should not have been emitted but were: " + extraSeqnos,
+                    missingSeqnos.isEmpty() && extraSeqnos.isEmpty()
             );
         });
     }
@@ -986,6 +1022,54 @@ public class ExactlyOnceSourceIntegrationTest {
         return result;
     }
 
+    private Map<Integer, List<Long>> parseOffsetForTasks(ConsumerRecords<byte[], byte[]> offsetRecords) {
+        JsonConverter offsetsConverter = new JsonConverter();
+        // The JSON converter behaves identically for keys and values. If that ever changes, we may need to update this test to use
+        // separate converter instances.
+        offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false);
+
+        Map<Integer, List<Long>> result = new HashMap<>();
+        for (ConsumerRecord<byte[], byte[]> offsetRecord : offsetRecords) {
+            Object keyObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.key()).value();
+            Object valueObject = offsetsConverter.toConnectData("topic name is not used by converter", offsetRecord.value()).value();
+
+            assertNotNull("Offset key should not be null", keyObject);
+            assertNotNull("Offset value should not be null", valueObject);
+
+            @SuppressWarnings("unchecked")
+            List<Object> key = assertAndCast(keyObject, List.class, "Key");
+            assertEquals(
+                    "Offset topic key should be a list containing two elements: the name of the connector, and the connector-provided source partition",
+                    2,
+                    key.size()
+            );
+            assertEquals(CONNECTOR_NAME, key.get(0));
+            @SuppressWarnings("unchecked")
+            Map<String, Object> partition = assertAndCast(key.get(1), Map.class, "Key[1]");
+            Object taskIdObject = partition.get("task.id");
+            assertNotNull("Serialized source partition should contain 'task.id' field from MonitorableSourceConnector", taskIdObject);
+            String taskId = assertAndCast(taskIdObject, String.class, "task ID");
+            assertTrue("task ID should match pattern '<connectorName>-<taskId>", taskId.startsWith(CONNECTOR_NAME + "-"));
+            String taskIdRemainder = taskId.substring(CONNECTOR_NAME.length() + 1);
+            int taskNum;
+            try {
+                taskNum = Integer.parseInt(taskIdRemainder);
+            } catch (NumberFormatException e) {
+                throw new AssertionError("task ID should match pattern '<connectorName>-<taskId>', where <taskId> is an integer", e);
+            }
+
+            @SuppressWarnings("unchecked")
+            Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value");
+
+            Object seqnoObject = value.get("saved");
+            assertNotNull("Serialized source offset should contain 'seqno' field from MonitorableSourceConnector", seqnoObject);
+            long seqno = assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+
+            result.computeIfAbsent(taskNum, t -> new ArrayList<>()).add(seqno);
+        }
+        return result;
+    }
+
     @SuppressWarnings("unchecked")
     private static <T> T assertAndCast(Object o, Class<T> klass, String objectDescription) {
         String className = o == null ? "null" : o.getClass().getName();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index c2820315d6..33ba1588a7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -172,7 +172,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
             batchSize = Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
             taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
             Map<String, Object> offset = Optional.ofNullable(
-                    context.offsetStorageReader().offset(Collections.singletonMap("task.id", taskId)))
+                    context.offsetStorageReader().offset(sourcePartition(taskId)))
                     .orElse(Collections.emptyMap());
             startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
             seqno = startingSeqno;