You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/23 15:52:03 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #12228: KAFKA-13950: Fix resource leaks

divijvaidya commented on code in PR #12228:
URL: https://github.com/apache/kafka/pull/12228#discussion_r952807299


##########
clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java:
##########
@@ -54,12 +54,18 @@ public void configure(Map<String, ?> configs) throws KafkaException {
     @Override
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
                                      MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
+        PlaintextTransportLayer transportLayer = null;
         try {
-            PlaintextTransportLayer transportLayer = buildTransportLayer(key);
-            Supplier<Authenticator> authenticatorCreator = () -> new PlaintextAuthenticator(configs, transportLayer, listenerName);
+            transportLayer = buildTransportLayer(key);
+            final PlaintextTransportLayer finalTransportLayer = transportLayer;
+            Supplier<Authenticator> authenticatorCreator = () -> new PlaintextAuthenticator(configs, finalTransportLayer, listenerName);
             return buildChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
         } catch (Exception e) {
+            // Ideally these resources are closed by the KafkaChannel but this builder should close the resources instead
+            // if an error occurs due to which KafkaChannel is not created.
+            Utils.closeQuietly(transportLayer, "transport layer for channel Id: " + id);
+            Utils.closeQuietly(metadataRegistry, "metadataRegistry");

Review Comment:
   > Shouldn't that take care of closing the metadata registry since it actually creates it?
   
   I agree, I missed it earlier. I have moved the responsibility of closing in case of errors to `Selector.buildAndAttachKafkaChannel`. I have also added a unit test that fails before this fix and passes afterwards.
   
   > With regards to the transport layer, I wonder if we could move the cleanup logic to `KafkaChannel` instead
   
   I am afraid that might not be possible because we are handling scenarios where we encounter an error while creating the `KafkaChannel` itself (e.g. a scenario where authenticatorCreator throws an error in the constructor). We can close it uniformly at `ChannelBuilder` but that would require a bit of refactoring involving conversion of `ChannelBuiler` interface into a class. Please let me know if you have any ideas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org