You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2021/02/22 16:15:20 UTC

[kafka] branch 2.8 updated (b7f765f -> 09bba27)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from b7f765f  MINOR: Remove redundant log close in `KafkaRaftClient` (#10168)
     new 3fc5404  KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
     new 09bba27  KAFKA-12339: Add retry to admin client's listOffsets (#10152)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../admin/internals/MetadataOperationContext.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 54 ++++++++++++++++++----
 .../connect/storage/KafkaConfigBackingStore.java   | 19 +++++++-
 .../connect/storage/KafkaOffsetBackingStore.java   | 19 +++++++-
 .../connect/storage/KafkaStatusBackingStore.java   | 19 +++++++-
 5 files changed, 98 insertions(+), 14 deletions(-)


[kafka] 02/02: KAFKA-12339: Add retry to admin client's listOffsets (#10152)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 09bba2755914e03ee4217811975732ac6564d649
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Sat Feb 20 07:02:09 2021 +0800

    KAFKA-12339: Add retry to admin client's listOffsets (#10152)
    
    `KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way.
    
    This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers.
    
    Author: Chia-Ping Tsai <ch...@gmail.com>
    Reviewers: Randall Hauch <rh...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../admin/internals/MetadataOperationContext.java  |  1 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 54 ++++++++++++++++++----
 2 files changed, 47 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index c05e5cf..e7f2c07 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,6 +82,7 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
+            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
             for (PartitionMetadata pm : tm.partitionMetadata()) {
                 if (shouldRefreshMetadata(pm.error)) {
                     throw pm.error.exception();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ce8f056..6eb972b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -445,12 +445,16 @@ public class KafkaAdminClientTest {
     }
 
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
+        return prepareMetadataResponse(cluster, error, error);
+    }
+
+    private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponsePartition> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponsePartition pm  = new MetadataResponsePartition()
-                    .setErrorCode(error.code())
+                    .setErrorCode(partitionError.code())
                     .setPartitionIndex(pInfo.partition())
                     .setLeaderId(pInfo.leader().id())
                     .setLeaderEpoch(234)
@@ -460,19 +464,19 @@ public class KafkaAdminClientTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                .setErrorCode(error.code())
+                .setErrorCode(topicError.code())
                 .setName(topic)
                 .setIsInternal(false)
                 .setPartitions(pms);
             metadata.add(tm);
         }
         return MetadataResponse.prepareResponse(true,
-            0,
-            cluster.nodes(),
-            cluster.clusterResource().clusterId(),
-            cluster.controller().id(),
-            metadata,
-            MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
+                0,
+                cluster.nodes(),
+                cluster.clusterResource().clusterId(),
+                cluster.controller().id(),
+                metadata,
+                MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId,
@@ -3909,6 +3913,40 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            // listoffsets response from broker 0
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Collections.singletonList(ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321)));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
+
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            assertEquals(1, offsets.size());
+            assertEquals(123L, offsets.get(tp0).offset());
+            assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
+            assertEquals(-1L, offsets.get(tp0).timestamp());
+        }
+    }
+
+    @Test
     public void testListOffsetsRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);


[kafka] 01/02: KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 3fc54043bf1cf197965e67d75e7f37250f406966
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Feb 19 11:49:56 2021 -0600

    KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
    
    These Kafka*BackingStore classes used in Connect have a recently-added deprecated constructor, which is not used within AK. However, this commit corrects a AdminClient resource leak if those deprecated constructors are used outside of AK. The fix simply ensures that the AdminClient created by the “default” supplier is always closed when the Kafka*BackingStore is stopped.
    
    Author: Randall Hauch <rh...@gmail.com>
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../connect/storage/KafkaConfigBackingStore.java      | 19 +++++++++++++++++--
 .../connect/storage/KafkaOffsetBackingStore.java      | 19 +++++++++++++++++--
 .../connect/storage/KafkaStatusBackingStore.java      | 19 +++++++++++++++++--
 3 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index d4e6358..dcfc28c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -226,6 +227,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     private final Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
     private final Supplier<TopicAdmin> topicAdminSupplier;
+    private SharedTopicAdmin ownTopicAdmin;
 
     // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
     // is in an inconsistent state and we cannot safely use them until they have been refreshed.
@@ -291,7 +293,13 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     @Override
     public void stop() {
         log.info("Closing KafkaConfigBackingStore");
-        configLog.stop();
+        try {
+            configLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
         log.info("Closed KafkaConfigBackingStore");
     }
 
@@ -479,7 +487,14 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).configStorageTopicSettings()
                                             : Collections.emptyMap();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 26b47f9..313baf7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
     private final Supplier<TopicAdmin> topicAdminSupplier;
+    private SharedTopicAdmin ownTopicAdmin;
 
     @Deprecated
     public KafkaOffsetBackingStore() {
@@ -98,7 +100,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).offsetStorageTopicSettings()
                                             : Collections.emptyMap();
@@ -140,7 +149,13 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     @Override
     public void stop() {
         log.info("Stopping KafkaOffsetBackingStore");
-        offsetLog.stop();
+        try {
+            offsetLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
         log.info("Stopped KafkaOffsetBackingStore");
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index efa405f..eadbe18 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
 import org.apache.kafka.connect.util.Table;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.slf4j.Logger;
@@ -134,6 +135,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     private String statusTopic;
     private KafkaBasedLog<String, byte[]> kafkaLog;
     private int generation;
+    private SharedTopicAdmin ownTopicAdmin;
 
     @Deprecated
     public KafkaStatusBackingStore(Time time, Converter converter) {
@@ -177,7 +179,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
 
         Map<String, Object> adminProps = new HashMap<>(originals);
         ConnectUtils.addMetricsContextProperties(adminProps, config, clusterId);
-        Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ? topicAdminSupplier : () -> new TopicAdmin(adminProps);
+        Supplier<TopicAdmin> adminSupplier;
+        if (topicAdminSupplier != null) {
+            adminSupplier = topicAdminSupplier;
+        } else {
+            // Create our own topic admin supplier that we'll close when we're stopped
+            ownTopicAdmin = new SharedTopicAdmin(adminProps);
+            adminSupplier = ownTopicAdmin;
+        }
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) config).statusStorageTopicSettings()
@@ -221,7 +230,13 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
 
     @Override
     public void stop() {
-        kafkaLog.stop();
+        try {
+            kafkaLog.stop();
+        } finally {
+            if (ownTopicAdmin != null) {
+                ownTopicAdmin.close();
+            }
+        }
     }
 
     @Override