You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/16 17:48:23 UTC

[kafka] branch 3.4 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 2da64826fbc Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
2da64826fbc is described below

commit 2da64826fbc39567c424a1114e5777d6b84d184f
Author: Terry <be...@hotmail.com>
AuthorDate: Thu Feb 16 12:39:24 2023 -0500

    Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../apache/kafka/common/config/AbstractConfig.java | 101 +++++++++++++--------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  27 ++++++
 .../kafka/clients/producer/KafkaProducerTest.java  |  24 +++++
 .../kafka/common/config/AbstractConfigTest.java    |  26 +++++-
 .../apache/kafka/test/MockConsumerInterceptor.java |  14 +++
 .../apache/kafka/test/MockProducerInterceptor.java |  14 +++
 6 files changed, 166 insertions(+), 40 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..13637163311 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -66,44 +66,43 @@ public class AbstractConfig {
      * Construct a configuration with a ConfigDef and the configuration properties, which can include properties
      * for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property
      * values.
-     *
+     * <p>
      * The originals is a name-value pair configuration properties and optional config provider configs. The
      * value of the configuration can be a variable as defined below or the actual value. This constructor will
      * first instantiate the ConfigProviders using the config provider configs, then it will find all the
      * variables in the values of the originals configurations, attempt to resolve the variables using the named
      * ConfigProviders, and then parse and validate the configurations.
-     *
+     * <p>
      * ConfigProvider configs can be passed either as configs in the originals map or in the separate
      * configProviderProps map. If config providers properties are passed in the configProviderProps any config
      * provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the
      * constructor will skip the variable substitution step and will simply validate and parse the supplied
      * configuration.
-     *
+     * <p>
      * The "{@code config.providers}" configuration property and all configuration properties that begin with the
      * "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property
      * specifies the names of the config providers, and properties that begin with the "{@code config.providers..}"
      * prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}"
      * property specifies the name of the {@link ConfigProvider} implementation class that should be used for
      * the provider.
-     *
+     * <p>
      * The keys for ConfigProvider configs in both originals and configProviderProps will start with the above
      * mentioned "{@code config.providers.}" prefix.
-     *
+     * <p>
      * Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider,
      * "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key"
      * and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is
      * then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will
      * be left unchanged in the configuration.
      *
-     *
-     * @param definition the definition of the configurations; may not be null
-     * @param originals the configuration properties plus any optional config provider properties;
+     * @param definition          the definition of the configurations; may not be null
+     * @param originals           the configuration properties plus any optional config provider properties;
      * @param configProviderProps the map of properties of config providers which will be instantiated by
-     *        the constructor to resolve any variables in {@code originals}; may be null or empty
-     * @param doLog whether the configurations should be logged
+     *                            the constructor to resolve any variables in {@code originals}; may be null or empty
+     * @param doLog               whether the configurations should be logged
      */
     @SuppressWarnings("unchecked")
-    public AbstractConfig(ConfigDef definition, Map<?, ?> originals,  Map<String, ?> configProviderProps, boolean doLog) {
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
         /* check that all the keys are really strings */
         for (Map.Entry<?, ?> entry : originals.entrySet())
             if (!(entry.getKey() instanceof String))
@@ -127,7 +126,7 @@ public class AbstractConfig {
      * that will be used to resolve variables in configuration property values.
      *
      * @param definition the definition of the configurations; may not be null
-     * @param originals the configuration properties plus any optional config provider properties; may not be null
+     * @param originals  the configuration properties plus any optional config provider properties; may not be null
      */
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
         this(definition, originals, Collections.emptyMap(), true);
@@ -139,8 +138,8 @@ public class AbstractConfig {
      * that will be used to resolve variables in configuration property values.
      *
      * @param definition the definition of the configurations; may not be null
-     * @param originals the configuration properties plus any optional config provider properties; may not be null
-     * @param doLog whether the configurations should be logged
+     * @param originals  the configuration properties plus any optional config provider properties; may not be null
+     * @param doLog      whether the configurations should be logged
      */
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
         this(definition, originals, Collections.emptyMap(), doLog);
@@ -241,6 +240,7 @@ public class AbstractConfig {
 
     /**
      * Get all the original settings, ensuring that all values are of type String.
+     *
      * @return the original settings
      * @throws ClassCastException if any of the values are not strings
      */
@@ -269,7 +269,7 @@ public class AbstractConfig {
      * Gets all original settings with the given prefix.
      *
      * @param prefix the prefix to use as a filter
-     * @param strip strip the prefix before adding to the output if set true
+     * @param strip  strip the prefix before adding to the output if set true
      * @return a Map containing the settings with the prefix
      */
     public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
@@ -288,7 +288,7 @@ public class AbstractConfig {
     /**
      * Put all keys that do not start with {@code prefix} and their parsed values in the result map and then
      * put all the remaining keys with the prefix stripped and their parsed values in the result map.
-     *
+     * <p>
      * This is useful if one wants to allow prefixed configs to override default ones.
      * <p>
      * Two forms of prefixes are supported:
@@ -323,7 +323,7 @@ public class AbstractConfig {
     /**
      * If at least one key with {@code prefix} exists, all prefixed values will be parsed and put into map.
      * If no value with {@code prefix} exists all unprefixed values will be returned.
-     *
+     * <p>
      * This is useful if one wants to allow prefixed configs to override default ones, but wants to use either
      * only prefixed configs or only regular configs, but not mix them.
      */
@@ -389,8 +389,8 @@ public class AbstractConfig {
     private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
         if (klass == null)
             return null;
-
         Object o;
+
         if (klass instanceof String) {
             try {
                 o = Utils.newInstance((String) klass, t);
@@ -401,11 +401,15 @@ public class AbstractConfig {
             o = Utils.newInstance((Class<?>) klass);
         } else
             throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
-        if (!t.isInstance(o))
-            throw new KafkaException(klass + " is not an instance of " + t.getName());
-        if (o instanceof Configurable)
-            ((Configurable) o).configure(configPairs);
-
+        try {
+            if (!t.isInstance(o))
+                throw new KafkaException(klass + " is not an instance of " + t.getName());
+            if (o instanceof Configurable)
+                ((Configurable) o).configure(configPairs);
+        } catch (Exception e) {
+            maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+            throw e;
+        }
         return t.cast(o);
     }
 
@@ -414,7 +418,7 @@ public class AbstractConfig {
      * Configurable configure it using the configuration.
      *
      * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param t   The interface the class should implement
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t) {
@@ -425,8 +429,8 @@ public class AbstractConfig {
      * Get a configured instance of the give class specified by the given configuration key. If the object implements
      * Configurable configure it using the configuration.
      *
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param key             The configuration key for the class
+     * @param t               The interface the class should implement
      * @param configOverrides override origin configs
      * @return A configured instance of the class
      */
@@ -440,8 +444,9 @@ public class AbstractConfig {
      * Get a list of configured instances of the given class specified by the given configuration key. The configuration
      * may specify either null or an empty string to indicate no configured instances. In both cases, this method
      * returns an empty list to indicate no configured instances.
+     *
      * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     * @param t   The interface the class should implement
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
@@ -452,8 +457,9 @@ public class AbstractConfig {
      * Get a list of configured instances of the given class specified by the given configuration key. The configuration
      * may specify either null or an empty string to indicate no configured instances. In both cases, this method
      * returns an empty list to indicate no configured instances.
-     * @param key The configuration key for the class
-     * @param t The interface the class should implement
+     *
+     * @param key             The configuration key for the class
+     * @param t               The interface the class should implement
      * @param configOverrides Configuration overrides to use.
      * @return The list of configured instances
      */
@@ -465,8 +471,9 @@ public class AbstractConfig {
      * Get a list of configured instances of the given class specified by the given configuration key. The configuration
      * may specify either null or an empty string to indicate no configured instances. In both cases, this method
      * returns an empty list to indicate no configured instances.
-     * @param classNames The list of class names of the instances to create
-     * @param t The interface the class should implement
+     *
+     * @param classNames      The list of class names of the instances to create
+     * @param t               The interface the class should implement
      * @param configOverrides Configuration overrides to use.
      * @return The list of configured instances
      */
@@ -476,14 +483,28 @@ public class AbstractConfig {
             return objects;
         Map<String, Object> configPairs = originals();
         configPairs.putAll(configOverrides);
-        for (Object klass : classNames) {
-            Object o = getConfiguredInstance(klass, t, configPairs);
-            objects.add(t.cast(o));
+
+        try {
+            for (Object klass : classNames) {
+                Object o = getConfiguredInstance(klass, t, configPairs);
+                objects.add(t.cast(o));
+            }
+        } catch (Exception e) {
+            for (Object object : objects) {
+                maybeClose(object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");
+            }
+            throw e;
         }
         return objects;
     }
 
-    private Map<String, String> extractPotentialVariables(Map<?, ?>  configMap) {
+    private static void maybeClose(Object object, String name) {
+        if (object instanceof AutoCloseable) {
+            Utils.closeQuietly((AutoCloseable) object, name);
+        }
+    }
+
+    private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
         // Variables are tuples of the form "${providerName:[path:]key}". From the configMap we extract the subset of configs with string
         // values as potential variables.
         Map<String, String> configMapAsString = new HashMap<>();
@@ -498,12 +519,13 @@ public class AbstractConfig {
     /**
      * Instantiates given list of config providers and fetches the actual values of config variables from the config providers.
      * returns a map of config key and resolved values.
+     *
      * @param configProviderProps The map of config provider configs
-     * @param originals The map of raw configs.
+     * @param originals           The map of raw configs.
      * @return map of resolved config variable.
      */
     @SuppressWarnings("unchecked")
-    private  Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
+    private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
         Map<String, String> providerConfigString;
         Map<String, ?> configProperties;
         Map<String, Object> resolvedOriginals = new HashMap<>();
@@ -549,7 +571,8 @@ public class AbstractConfig {
      * config.providers.{name}.class : The Java class name for a provider.
      * config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
      * returns a map of config provider name and its instance.
-     * @param indirectConfigs The map of potential variable configs
+     *
+     * @param indirectConfigs          The map of potential variable configs
      * @param providerConfigProperties The map of config provider configs
      * @return map map of config provider name and its instance.
      */
@@ -562,7 +585,7 @@ public class AbstractConfig {
 
         Map<String, String> providerMap = new HashMap<>();
 
-        for (String provider: configProviders.split(",")) {
+        for (String provider : configProviders.split(",")) {
             String providerClass = providerClassProperty(provider);
             if (indirectConfigs.containsKey(providerClass))
                 providerMap.put(provider, indirectConfigs.get(providerClass));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f08ac45ddaf..7729892a64f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -151,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public class KafkaConsumerTest {
+
     private final String topic = "test";
     private final Uuid topicId = Uuid.randomUuid();
     private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -503,6 +504,32 @@ public class KafkaConsumerTest {
         }
     }
 
+    @Test
+    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+        final int targetInterceptor = 3;
+
+        try {
+            Properties props = new Properties();
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,  MockConsumerInterceptor.class.getName() + ", "
+                    + MockConsumerInterceptor.class.getName() + ", "
+                    + MockConsumerInterceptor.class.getName());
+
+            MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
+
+            assertThrows(KafkaException.class, () -> {
+                new KafkaConsumer<>(
+                        props, new StringDeserializer(), new StringDeserializer());
+            });
+
+            assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
+            assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+
+        } finally {
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
+
     @Test
     public void testPause() {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index a1a854fc671..8023c41ac36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -569,7 +569,31 @@ public class KafkaProducerTest {
             MockProducerInterceptor.resetCounters();
         }
     }
+    @Test
+    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+        final int targetInterceptor = 3;
+        try {
+            Properties props = new Properties();
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", "
+                    +  org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", "
+                    +  org.apache.kafka.test.MockProducerInterceptor.class.getName());
+            props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
+
+            MockProducerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
+
+            assertThrows(KafkaException.class, () -> {
+                new KafkaProducer<>(
+                        props, new StringSerializer(), new StringSerializer());
+            });
+
+            assertEquals(3, MockProducerInterceptor.CONFIG_COUNT.get());
+            assertEquals(3, MockProducerInterceptor.CLOSE_COUNT.get());
 
+        } finally {
+            MockProducerInterceptor.resetCounters();
+        }
+    }
     @Test
     public void testPartitionerClose() {
         try {
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index e07cdb8de16..c0c6f8cee37 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
 import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.test.MockConsumerInterceptor;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -259,6 +260,30 @@ public class AbstractConfigTest {
         }
     }
 
+    @Test
+    public void testConfiguredInstancesClosedOnFailure() {
+
+        try {
+            Map<String, String> props = new HashMap<>();
+            String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
+                    + MockConsumerInterceptor.class.getName() + ", "
+                    + MockConsumerInterceptor.class.getName();
+            props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors);
+            props.put("client.id", "test");
+            TestConfig testConfig = new TestConfig(props);
+
+            MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
+            assertThrows(
+                    Exception.class,
+                    () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class)
+            );
+            assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
+            assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+        } finally {
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
+
     @Test
     public void testClassConfigs() {
         class RestrictedClassLoader extends ClassLoader {
@@ -585,7 +610,6 @@ public class AbstractConfigTest {
 
         public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
         private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
-
         static {
             CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
                                             Type.LIST,
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index b01584b62a2..ccc93c9accb 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -41,6 +41,9 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0);
+    public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0);
     public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
     public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
     public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_CONSUME = new AtomicReference<>(NO_CLUSTER_ID);
@@ -55,6 +58,11 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
         Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold.");
+        }
     }
 
     @Override
@@ -90,10 +98,16 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
         CLOSE_COUNT.incrementAndGet();
     }
 
+    public static void setThrowOnConfigExceptionThreshold(int value) {
+        THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value);
+    }
+
     public static void resetCounters() {
         INIT_COUNT.set(0);
         CLOSE_COUNT.set(0);
         ON_COMMIT_COUNT.set(0);
+        CONFIG_COUNT.set(0);
+        THROW_CONFIG_EXCEPTION.set(0);
         CLUSTER_META.set(null);
         CLUSTER_ID_BEFORE_ON_CONSUME.set(NO_CLUSTER_ID);
     }
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index eedc3bdaecd..6ef4f50893e 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -32,6 +32,9 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0);
+    public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0);
     public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
@@ -59,6 +62,11 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
         Object clientIdValue = configs.get(ProducerConfig.CLIENT_ID_CONFIG);
         if (clientIdValue == null)
             throw new ConfigException("Mock producer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+        CONFIG_COUNT.incrementAndGet();
+        if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) {
+            throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold.");
+        }
     }
 
     @Override
@@ -89,10 +97,16 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
         CLOSE_COUNT.incrementAndGet();
     }
 
+    public static void setThrowOnConfigExceptionThreshold(int value) {
+        THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value);
+    }
+
     public static void resetCounters() {
         INIT_COUNT.set(0);
         CLOSE_COUNT.set(0);
         ONSEND_COUNT.set(0);
+        CONFIG_COUNT.set(0);
+        THROW_CONFIG_EXCEPTION.set(0);
         ON_SUCCESS_COUNT.set(0);
         ON_ERROR_COUNT.set(0);
         ON_ERROR_WITH_METADATA_COUNT.set(0);