You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/01/30 20:15:02 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091084063


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   This can be simplified a bit:
   
   ```suggestion
                       Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");
   
   ```
   
   1. `Utils::closeQuietly` handles failures for us
   2. `Closeable` is a subinterface of `AutoCloseable`, so we only need to check for the latter



##########
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##########
@@ -55,6 +58,11 @@ public void configure(Map<String, ?> configs) {
         Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource.");

Review Comment:
   It seems like the failure message here is hinting that we try to create a Kafka producer in this interceptor, but there isn't much else in the class to go along with that.
   
   Could we use a more generic message like "Failed to instantiate interceptor (reached throw-on-config threshold)"?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -503,6 +505,30 @@ public void testInterceptorConstructorClose() {
         }
     }
 
+    @Test
+    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {

Review Comment:
   I like this test. Could we also get a matching one for producer interceptors, and something analogous for the `AbstractConfig` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org