You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/03/25 04:30:51 UTC
[kafka] branch 3.2 updated: KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 3f59718 KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
3f59718 is described below
commit 3f59718fb9c7246dadfbf70bf042b386d7e9ee4a
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Thu Mar 24 21:40:10 2022 -0500
KAFKA-13770: Restore compatibility with KafkaBasedLog using older Kafka brokers (#11946)
The `retryEndOffsets(…)` method in `TopicAdmin` recently added (KAFKA-12879, #11797) to allow the `KafkaBasedLog.start()` method to retry any failures reading the last offsets for a topic. However, this introduce a regression when talking to older brokers (0.10.x or earlier).
The `KafkaBasedLog` already had logic that expected an `UnsupportedVersionException` thrown by the admin client when a Kafka API is not available on an older broker, but the new retry logic in `TopicAdmin` did not account for this and wrapped the exception, thereby breaking the `KafkaBasedLog` logic and preventing startup.
The fix is to propagate this `UnsupportedVersionException` from the `TopicAdmin.retryEndOffsets(…)` method. Added a new unit test that first replicated the problem before the fix, and verified the fix corrects the problem.
---
.../org/apache/kafka/connect/util/TopicAdmin.java | 4 ++++
.../apache/kafka/connect/util/TopicAdminTest.java | 25 +++++++++++++++++++++-
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index faf7b37..d97533a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -714,6 +714,7 @@ public class TopicAdmin implements AutoCloseable {
* must be 0 or more
* @return the map of offset for each topic partition, or an empty map if the supplied partitions
* are null or empty
+ * @throws UnsupportedVersionException if the broker is too old to support the admin client API to read end offsets
* @throws ConnectException if {@code timeoutDuration} is exhausted
* @see TopicAdmin#endOffsets(Set)
*/
@@ -725,6 +726,9 @@ public class TopicAdmin implements AutoCloseable {
() -> "list offsets for topic partitions",
timeoutDuration,
retryBackoffMs);
+ } catch (UnsupportedVersionException e) {
+ // Older brokers don't support this admin method, so rethrow it without wrapping it
+ throw e;
} catch (Exception e) {
throw new ConnectException("Failed to list offsets for topic partitions.", e);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index deea050..cf611db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -467,8 +467,31 @@ public class TopicAdminTest {
}
}
+ /**
+ * TopicAdmin can be used to read the end offsets, but the admin client API used to do this was
+ * added to the broker in 0.11.0.0. This means that if Connect talks to older brokers,
+ * the admin client cannot be used to read end offsets, and will throw an UnsupportedVersionException.
+ */
+ @Test
+ public void retryEndOffsetsShouldRethrowUnknownVersionException() {
+ String topicName = "myTopic";
+ TopicPartition tp1 = new TopicPartition(topicName, 0);
+ Set<TopicPartition> tps = Collections.singleton(tp1);
+ Long offset = null; // response should use error
+ Cluster cluster = createCluster(1, topicName, 1);
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+ // Expect the admin client list offsets will throw unsupported version, simulating older brokers
+ env.kafkaClient().prepareResponse(listOffsetsResultWithUnsupportedVersion(tp1, offset));
+ TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+ // The retryEndOffsets should catch and rethrow an unsupported version exception
+ assertThrows(UnsupportedVersionException.class, () -> admin.retryEndOffsets(tps, Duration.ofMillis(100), 1));
+ }
+ }
+
@Test
- public void retryEndOffsetsShouldThrowConnectException() {
+ public void retryEndOffsetsShouldWrapNonRetriableExceptionsWithConnectException() {
String topicName = "myTopic";
TopicPartition tp1 = new TopicPartition(topicName, 0);
Set<TopicPartition> tps = Collections.singleton(tp1);