You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "beardt (via GitHub)" <gi...@apache.org> on 2023/01/26 21:20:45 UTC

[GitHub] [kafka] beardt opened a new pull request, #13168: Kafka 14565: Interceptor Resource Leak

beardt opened a new pull request, #13168:
URL: https://github.com/apache/kafka/pull/13168

   Added try/catch within Abstract::getConfigInstances to enable clean up any available  instances (interceptors in this case) resources.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092017609


##########
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##########
@@ -55,6 +58,11 @@ public void configure(Map<String, ?> configs) {
         Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource.");

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature.  In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation.  
   
   Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out the test case.   Additionally, I've modified your suggested test case below which includes a slightly updated version of the `InterceptorTestConfig` class your review:   
   
    ```
   @Test
       public void testConfiguredInstancesClosedOnFailure() {
                  try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors);
               props.put(InterceptorTestConfig.CLIENT_ID_CONFIG, "test");
           
               InterceptorTestConfig interceptorTestConfig = new InterceptorTestConfig(props);
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(interceptorTestConfig.getTargetInterceptor());
               assertThrows(
                       Exception.class,
                       () -> interceptorTestConfig.getConfiguredInstances(InterceptorTestConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
           private final int targetInterceptor = 3;
           private static final ConfigDef CONFIG;
           private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
   
           public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
           public static final String CLIENT_ID_CONFIG = "client.id";
           public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
   
           static {
               CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
                       Type.LIST,
                       "",
                       Importance.LOW,
                       INTERCEPTOR_CLASSES_CONFIG_DOC);
           }
           public InterceptorTestConfig(Map<?, ?> props) {
               super(CONFIG, props);
           }
   
           public int getTargetInterceptor() {
               return targetInterceptor;
           }
       }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature.  In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation.  
   
   Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out the test case.   Additionally, I've modified your suggested test case below which includes an updated version of the `InterceptorTestConfig` class your review:   
   
    ```
   @Test
       public void testConfiguredInstancesClosedOnFailure() {
           try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors);
               props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
               TestConfig testConfig = new TestConfig(props);
   
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
               assertThrows(
                       Exception.class,
                       () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
           private final int targetInterceptor = 3;
           private static final ConfigDef CONFIG;
           private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
   
           public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
           public static final String CLIENT_ID_CONFIG = "client.id";
           public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
   
           static {
               CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
                       Type.LIST,
                       "",
                       Importance.LOW,
                       INTERCEPTOR_CLASSES_CONFIG_DOC);
           }
           public InterceptorTestConfig(Map<?, ?> props) {
               super(CONFIG, props);
           }
   
           public int getTargetInterceptor() {
               return targetInterceptor;
           }
       }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really wanted to avoid changing getConfiguredInstances, but  it seemed the only way to address this was to change it.   However,  I believe I've implemented each of your comments and athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature.  In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation.  
   
   Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out the test case.   Additionally, I've modified your suggested test case below with an updated version of the `InterceptorTestConfig` class your review:   
   
    ```
   @Test
       public void testConfiguredInstancesClosedOnFailure() {
           try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors);
               props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
               TestConfig testConfig = new TestConfig(props);
   
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
               assertThrows(
                       Exception.class,
                       () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
           private final int targetInterceptor = 3;
           private static final ConfigDef CONFIG;
           private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
   
           public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
           public static final String CLIENT_ID_CONFIG = "client.id";
           public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
   
           static {
               CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
                       Type.LIST,
                       "",
                       Importance.LOW,
                       INTERCEPTOR_CLASSES_CONFIG_DOC);
           }
           public InterceptorTestConfig(Map<?, ?> props) {
               super(CONFIG, props);
           }
   
           public int getTargetInterceptor() {
               return targetInterceptor;
           }
       }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092632585


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -414,34 +414,51 @@ private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object
      * Configurable configure it using the configuration.
      *
      * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param t   The interface the class should implement
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t) {
-        return getConfiguredInstance(key, t, Collections.emptyMap());
+        T configuredInstance = null;
+
+        try {
+            configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap());
+        } catch (Exception e) {
+            maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
+
+        return configuredInstance;
     }
 
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
      *
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param key             The configuration key for the class
+     * @param t               The interface the class should implement
      * @param configOverrides override origin configs
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t, Map<String, Object> configOverrides) {
         Class<?> c = getClass(key);
+        T configuredInstance = null;
 
-        return getConfiguredInstance(c, t, originals(configOverrides));
+        try {
+            configuredInstance = getConfiguredInstance(c, t, originals(configOverrides));
+        } catch (Exception e) {

Review Comment:
   If we do the try/catch here, it doesn't work; `configuredInstance` is guaranteed to be null in the catch block.
   
   I was thinking of something like this:
   ```java
           if (o instanceof Configurable) {
               try {
                   ((Configurable) o).configure(configPairs);
               } catch (Exception e) {
                   maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
                   throw e;
               }
           }
   ```
   
   being added to [these lines](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L406-L407) in the other variant of `getConfiguredInstance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1107747315


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -389,22 +389,27 @@ public void logUnused() {
     private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
         if (klass == null)
             return null;
+        Object o = null;
+        try {
+            if (klass instanceof String) {
+                try {
+                    o = Utils.newInstance((String) klass, t);
+                } catch (ClassNotFoundException e) {
+                    throw new KafkaException("Class " + klass + " cannot be found", e);
+                }
+            } else if (klass instanceof Class<?>) {
+                o = Utils.newInstance((Class<?>) klass);
+            } else
+                throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
+            if (!t.isInstance(o))
+                throw new KafkaException(klass + " is not an instance of " + t.getName());
+            if (o instanceof Configurable)
+                ((Configurable) o).configure(configPairs);
+        } catch (Exception e) {
+            maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
 

Review Comment:
   Nit: remove this extra blank line?



##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -599,6 +624,31 @@ public TestConfig(Map<?, ?> props) {
         }
     }
 
+    private static class InterceptorTestConfig extends AbstractConfig {
+        private final int targetInterceptor = 3;
+        private static final ConfigDef CONFIG;
+        private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
+
+        public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
+        public static final String CLIENT_ID_CONFIG = "client.id";
+        public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+
+        static {
+            CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
+                    Type.LIST,
+                    Collections.emptyList(),
+                    Importance.LOW,
+                    INTERCEPTOR_CLASSES_CONFIG_DOC);
+        }
+        public InterceptorTestConfig(Map<?, ?> props) {
+            super(CONFIG, props);
+        }
+
+        public int getTargetInterceptor() {
+            return targetInterceptor;
+        }
+    }

Review Comment:
   Sorry, I'm still not in favor of this additional class. The name of the config property isn't super critical and it's not necessary to try to instantiate metrics reporters when using the `metric.reporters` property in the test config; this is only constructed for testing, after all.
   
   Please remove this class and simplify the testing logic by using the existing `TestConfig` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, it seemed the only way to address this was to change the getConfiguredInstances.   But,  I believe I've implemented each of your comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1107962390


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -389,22 +389,27 @@ public void logUnused() {
     private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
         if (klass == null)
             return null;
+        Object o = null;
+        try {
+            if (klass instanceof String) {
+                try {
+                    o = Utils.newInstance((String) klass, t);
+                } catch (ClassNotFoundException e) {
+                    throw new KafkaException("Class " + klass + " cannot be found", e);
+                }
+            } else if (klass instanceof Class<?>) {
+                o = Utils.newInstance((Class<?>) klass);
+            } else
+                throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
+            if (!t.isInstance(o))
+                throw new KafkaException(klass + " is not an instance of " + t.getName());
+            if (o instanceof Configurable)
+                ((Configurable) o).configure(configPairs);
+        } catch (Exception e) {
+            maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
 

Review Comment:
   @C0urante: I removed all extra blank lines `getConfiguredInstance`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1107961671


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -599,6 +624,31 @@ public TestConfig(Map<?, ?> props) {
         }
     }
 
+    private static class InterceptorTestConfig extends AbstractConfig {
+        private final int targetInterceptor = 3;
+        private static final ConfigDef CONFIG;
+        private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
+
+        public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
+        public static final String CLIENT_ID_CONFIG = "client.id";
+        public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+
+        static {
+            CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
+                    Type.LIST,
+                    Collections.emptyList(),
+                    Importance.LOW,
+                    INTERCEPTOR_CLASSES_CONFIG_DOC);
+        }
+        public InterceptorTestConfig(Map<?, ?> props) {
+            super(CONFIG, props);
+        }
+
+        public int getTargetInterceptor() {
+            return targetInterceptor;
+        }
+    }

Review Comment:
   @C0urante No apologies necessary.  I've reverted the code back to your preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on PR #13168:
URL: https://github.com/apache/kafka/pull/13168#issuecomment-1413758244

   I went ahead and reintroduced  the InterceptorTestConfig as I do not think it adds anymore complexity than `TestConfig` and `ClassTestConfig` while also making  `testConfiguredInstancesClosedOnFailure` less confusing due to the usage of `MetricReporters` configs with consumer interceptors.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but  it seemed the only way to address this was to change it very carefully.   However,  I believe I've implemented each of your comments and athanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091360138


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   @C0urante  I'm glad to hear that you are feeling better.  
   Yes, given the KIP avoidance constraint, I really did not want to change getConfiguredInstances, but  it seemed the only way to address this was to change it very carefully.   However,  I believe I've implemented each of your comments and thanks for the excellent feedback. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092513735


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   @C0urante  Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092516181


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
     private final int defaultApiTimeoutMs = 60000;
     private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
+    private final int targetInterceptor = 3;

Review Comment:
   Just an observation, I see the following variable is used in only one test case  `private final int throttleMs = 10;` as well.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092133488


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");

Review Comment:
   We still need an `instanceof` check here:
   ```suggestion
                   if (object instanceof AutoCloseable) {
                       Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");
                   }
   ```



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -137,6 +137,7 @@
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+    private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
     private final int defaultApiTimeoutMs = 60000;
     private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
+    private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more sense to inline directly into the test case it's used in.



##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +479,22 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");

Review Comment:
   Actually, come to think of it, we might also want to invoke `close` on objects created in `getConfiguredInstance` if they throw an exception from `configure` [here](https://github.com/apache/kafka/blob/6c98544a964b40ede6bbe1b3440f8e5db96a4ad6/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L407).
   
   Perhaps we could pull this out into a reusable method and use it both here and there? Thinking something like:
   
   ```java
   private static void maybeClose(Object object, String name) {
       if (object instanceof AutoCloseable)
           Utils.closeQuietly(object, name);
   }
   ```



##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -45,6 +47,8 @@
 
 public class AbstractConfigTest {
 
+
+

Review Comment:
   We don't need this change; it's fine as-is.



##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   We don't have to test both the producer and consumer interceptor classes here; we just have to make sure that after an invocation of `AbstractConfig::getConfiguredInstances`, the right objects are cleaned up. (The value in this test is that we ensure that the cleanup logic is handled directly by the `AbstractConfig` class, instead of being performed out-of-band by, e.g., the `KafkaConsumer` or `KafkaProducer` classes).
   
   In addition, we don't have to use this test case or the pattern of `testValidInputs`/`testInvalidInputs` helper methods.
   
   We can save a lot of complexity by pulling this out into its own test case, and using the existing `TestConfig` class instead of creating a new one:
   
   ```java
       @Test
       public void testConfiguredInstancesClosedOnFailure() {
           try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors);
               props.put("client.id", "test");
               TestConfig testConfig = new TestConfig(props);
   
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
               assertThrows(
                       Exception.class,
                       () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -414,34 +414,51 @@ private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object
      * Configurable configure it using the configuration.
      *
      * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param t   The interface the class should implement
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t) {
-        return getConfiguredInstance(key, t, Collections.emptyMap());
+        T configuredInstance = null;
+
+        try {
+            configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap());
+        } catch (Exception e) {
+            maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
+
+        return configuredInstance;
     }
 
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
      *
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param key             The configuration key for the class
+     * @param t               The interface the class should implement
      * @param configOverrides override origin configs
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t, Map<String, Object> configOverrides) {
         Class<?> c = getClass(key);
+        T configuredInstance = null;
 
-        return getConfiguredInstance(c, t, originals(configOverrides));
+        try {
+            configuredInstance = getConfiguredInstance(c, t, originals(configOverrides));
+        } catch (Exception e) {

Review Comment:
   I went ahead and extended the try/catch to enclose the  `o = Utils.newInstance((String) klass, t);` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1093403718


##########
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##########
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
         testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  org.apache.kafka.test.MockConsumerInterceptor.class);
+        testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
+                + TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   As a follow up on your earlier comments, another value of this test is in understanding the intent of the coded feature.  In this case (pun intended), a developer looking at this test case would rightfully wonder what does `TestConfig.METRIC_REPORT_CLASSES_CONFIG` have to do with interceptors?  
   
   Furthermore, using `TestConfig` in your suggested fashion and goal adds confusion to the test case as `METRIC_REPORT_CLASSES_CONFIG` would never be used with interceptors in the real world.  
   
   **NOTE:**  Although not in scope,  `TestConfig` really should be renamed to `MetricReporterTestConfig` as this more accurately aligns with the implementation.  
   
   Therefore, I recommend retaining the slightly renamed `InterceptorTestConfig` class as this is consistent with the collaborating test subject e.g. `MockConsumerInterceptor` and  reduces noise when reasoning out the test case.   Additionally, I've modified your suggested test case below which includes a slightly updated version of the `InterceptorTestConfig` class your review:   
   
    ```
   @Test
       public void testConfiguredInstancesClosedOnFailure() {
           try {
               Map<String, String> props = new HashMap<>();
               String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName() + ", "
                       + MockConsumerInterceptor.class.getName();
               props.put(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, threeConsumerInterceptors);
               props.put(TestInterceptorConfig.CLIENT_ID_CONFIG, "test");
               TestConfig testConfig = new TestConfig(props);
   
               MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(TestInterceptorConfig.getTargetInterceptor());
               assertThrows(
                       Exception.class,
                       () -> testConfig.getConfiguredInstances(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, Object.class)
               );
               assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
               assertEquals(2, MockConsumerInterceptor.CLOSE_COUNT.get());
           } finally {
               MockConsumerInterceptor.resetCounters();
           }
       }
   ```
   
   ```
   private static class InterceptorTestConfig extends AbstractConfig {
           private final int targetInterceptor = 3;
           private static final ConfigDef CONFIG;
           private static final String INTERCEPTOR_CLASSES_CONFIG_DOC = "A list of classes to use as interceptors.";
   
           public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
           public static final String CLIENT_ID_CONFIG = "client.id";
           public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
   
           static {
               CONFIG = new ConfigDef().define(INTERCEPTOR_CLASSES_CONFIG,
                       Type.LIST,
                       "",
                       Importance.LOW,
                       INTERCEPTOR_CLASSES_CONFIG_DOC);
           }
           public InterceptorTestConfig(Map<?, ?> props) {
               super(CONFIG, props);
           }
   
           public int getTargetInterceptor() {
               return targetInterceptor;
           }
       }
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1091084063


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -476,14 +480,34 @@ public <T> List<T> getConfiguredInstances(List<String> classNames, Class<T> t, M
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                if (object instanceof AutoCloseable) {
+                    try {
+                        ((AutoCloseable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }
+                } else if (object instanceof Closeable) {
+                    try {
+                        ((Closeable) object).close();
+                    } catch (Exception ex) {
+                        log.error(String.format("Close exception on %s", object.getClass().getName()), ex);
+                    }

Review Comment:
   This can be simplified a bit:
   
   ```suggestion
                       Utils.closeQuietly((AutoCloseable) object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");
   
   ```
   
   1. `Utils::closeQuietly` handles failures for us
   2. `Closeable` is a subinterface of `AutoCloseable`, so we only need to check for the latter



##########
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##########
@@ -55,6 +58,11 @@ public void configure(Map<String, ?> configs) {
         Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Kafka producer creation failed. Failure may not have cleaned up listener thread resource.");

Review Comment:
   It seems like the failure message here is hinting that we try to create a Kafka producer in this interceptor, but there isn't much else in the class to go along with that.
   
   Could we use a more generic message like "Failed to instantiate interceptor (reached throw-on-config threshold)"?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -503,6 +505,30 @@ public void testInterceptorConstructorClose() {
         }
     }
 
+    @Test
+    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {

Review Comment:
   I like this test. Could we also get a matching one for producer interceptors, and something analogous for the `AbstractConfig` class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##########
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##########
@@ -414,34 +414,51 @@ private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object
      * Configurable configure it using the configuration.
      *
      * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param t   The interface the class should implement
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t) {
-        return getConfiguredInstance(key, t, Collections.emptyMap());
+        T configuredInstance = null;
+
+        try {
+            configuredInstance = getConfiguredInstance(key, t, Collections.emptyMap());
+        } catch (Exception e) {
+            maybeClose(configuredInstance, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
+
+        return configuredInstance;
     }
 
     /**
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
      *
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param key             The configuration key for the class
+     * @param t               The interface the class should implement
      * @param configOverrides override origin configs
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t, Map<String, Object> configOverrides) {
         Class<?> c = getClass(key);
+        T configuredInstance = null;
 
-        return getConfiguredInstance(c, t, originals(configOverrides));
+        try {
+            configuredInstance = getConfiguredInstance(c, t, originals(configOverrides));
+        } catch (Exception e) {

Review Comment:
   I've updated accordingly.  However, this assumes no resource leakage within the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] beardt commented on pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "beardt (via GitHub)" <gi...@apache.org>.
beardt commented on PR #13168:
URL: https://github.com/apache/kafka/pull/13168#issuecomment-1428692304

   @C0urante  Can we move forward with this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13168: Kafka 14565: Interceptor Resource Leak

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13168:
URL: https://github.com/apache/kafka/pull/13168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org