You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/16 17:48:23 UTC
[kafka] branch 3.4 updated: Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 2da64826fbc Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
2da64826fbc is described below
commit 2da64826fbc39567c424a1114e5777d6b84d184f
Author: Terry <be...@hotmail.com>
AuthorDate: Thu Feb 16 12:39:24 2023 -0500
Kafka 14565: On failure, close AutoCloseable objects instantiated and configured by AbstractConfig (#13168)
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../apache/kafka/common/config/AbstractConfig.java | 101 +++++++++++++--------
.../kafka/clients/consumer/KafkaConsumerTest.java | 27 ++++++
.../kafka/clients/producer/KafkaProducerTest.java | 24 +++++
.../kafka/common/config/AbstractConfigTest.java | 26 +++++-
.../apache/kafka/test/MockConsumerInterceptor.java | 14 +++
.../apache/kafka/test/MockProducerInterceptor.java | 14 +++
6 files changed, 166 insertions(+), 40 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e3fda4d9f54..13637163311 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -66,44 +66,43 @@ public class AbstractConfig {
* Construct a configuration with a ConfigDef and the configuration properties, which can include properties
* for zero or more {@link ConfigProvider} that will be used to resolve variables in configuration property
* values.
- *
+ * <p>
* The originals is a name-value pair configuration properties and optional config provider configs. The
* value of the configuration can be a variable as defined below or the actual value. This constructor will
* first instantiate the ConfigProviders using the config provider configs, then it will find all the
* variables in the values of the originals configurations, attempt to resolve the variables using the named
* ConfigProviders, and then parse and validate the configurations.
- *
+ * <p>
* ConfigProvider configs can be passed either as configs in the originals map or in the separate
* configProviderProps map. If config providers properties are passed in the configProviderProps any config
* provider properties in originals map will be ignored. If ConfigProvider properties are not provided, the
* constructor will skip the variable substitution step and will simply validate and parse the supplied
* configuration.
- *
+ * <p>
* The "{@code config.providers}" configuration property and all configuration properties that begin with the
* "{@code config.providers.}" prefix are reserved. The "{@code config.providers}" configuration property
* specifies the names of the config providers, and properties that begin with the "{@code config.providers..}"
* prefix correspond to the properties for that named provider. For example, the "{@code config.providers..class}"
* property specifies the name of the {@link ConfigProvider} implementation class that should be used for
* the provider.
- *
+ * <p>
* The keys for ConfigProvider configs in both originals and configProviderProps will start with the above
* mentioned "{@code config.providers.}" prefix.
- *
+ * <p>
* Variables have the form "${providerName:[path:]key}", where "providerName" is the name of a ConfigProvider,
* "path" is an optional string, and "key" is a required string. This variable is resolved by passing the "key"
* and optional "path" to a ConfigProvider with the specified name, and the result from the ConfigProvider is
* then used in place of the variable. Variables that cannot be resolved by the AbstractConfig constructor will
* be left unchanged in the configuration.
*
- *
- * @param definition the definition of the configurations; may not be null
- * @param originals the configuration properties plus any optional config provider properties;
+ * @param definition the definition of the configurations; may not be null
+ * @param originals the configuration properties plus any optional config provider properties;
* @param configProviderProps the map of properties of config providers which will be instantiated by
- * the constructor to resolve any variables in {@code originals}; may be null or empty
- * @param doLog whether the configurations should be logged
+ * the constructor to resolve any variables in {@code originals}; may be null or empty
+ * @param doLog whether the configurations should be logged
*/
@SuppressWarnings("unchecked")
- public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
/* check that all the keys are really strings */
for (Map.Entry<?, ?> entry : originals.entrySet())
if (!(entry.getKey() instanceof String))
@@ -127,7 +126,7 @@ public class AbstractConfig {
* that will be used to resolve variables in configuration property values.
*
* @param definition the definition of the configurations; may not be null
- * @param originals the configuration properties plus any optional config provider properties; may not be null
+ * @param originals the configuration properties plus any optional config provider properties; may not be null
*/
public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
this(definition, originals, Collections.emptyMap(), true);
@@ -139,8 +138,8 @@ public class AbstractConfig {
* that will be used to resolve variables in configuration property values.
*
* @param definition the definition of the configurations; may not be null
- * @param originals the configuration properties plus any optional config provider properties; may not be null
- * @param doLog whether the configurations should be logged
+ * @param originals the configuration properties plus any optional config provider properties; may not be null
+ * @param doLog whether the configurations should be logged
*/
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
this(definition, originals, Collections.emptyMap(), doLog);
@@ -241,6 +240,7 @@ public class AbstractConfig {
/**
* Get all the original settings, ensuring that all values are of type String.
+ *
* @return the original settings
* @throws ClassCastException if any of the values are not strings
*/
@@ -269,7 +269,7 @@ public class AbstractConfig {
* Gets all original settings with the given prefix.
*
* @param prefix the prefix to use as a filter
- * @param strip strip the prefix before adding to the output if set true
+ * @param strip strip the prefix before adding to the output if set true
* @return a Map containing the settings with the prefix
*/
public Map<String, Object> originalsWithPrefix(String prefix, boolean strip) {
@@ -288,7 +288,7 @@ public class AbstractConfig {
/**
* Put all keys that do not start with {@code prefix} and their parsed values in the result map and then
* put all the remaining keys with the prefix stripped and their parsed values in the result map.
- *
+ * <p>
* This is useful if one wants to allow prefixed configs to override default ones.
* <p>
* Two forms of prefixes are supported:
@@ -323,7 +323,7 @@ public class AbstractConfig {
/**
* If at least one key with {@code prefix} exists, all prefixed values will be parsed and put into map.
* If no value with {@code prefix} exists all unprefixed values will be returned.
- *
+ * <p>
* This is useful if one wants to allow prefixed configs to override default ones, but wants to use either
* only prefixed configs or only regular configs, but not mix them.
*/
@@ -389,8 +389,8 @@ public class AbstractConfig {
private <T> T getConfiguredInstance(Object klass, Class<T> t, Map<String, Object> configPairs) {
if (klass == null)
return null;
-
Object o;
+
if (klass instanceof String) {
try {
o = Utils.newInstance((String) klass, t);
@@ -401,11 +401,15 @@ public class AbstractConfig {
o = Utils.newInstance((Class<?>) klass);
} else
throw new KafkaException("Unexpected element of type " + klass.getClass().getName() + ", expected String or Class");
- if (!t.isInstance(o))
- throw new KafkaException(klass + " is not an instance of " + t.getName());
- if (o instanceof Configurable)
- ((Configurable) o).configure(configPairs);
-
+ try {
+ if (!t.isInstance(o))
+ throw new KafkaException(klass + " is not an instance of " + t.getName());
+ if (o instanceof Configurable)
+ ((Configurable) o).configure(configPairs);
+ } catch (Exception e) {
+ maybeClose(o, "AutoCloseable object constructed and configured during failed call to getConfiguredInstance");
+ throw e;
+ }
return t.cast(o);
}
@@ -414,7 +418,7 @@ public class AbstractConfig {
* Configurable configure it using the configuration.
*
* @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param t The interface the class should implement
* @return A configured instance of the class
*/
public <T> T getConfiguredInstance(String key, Class<T> t) {
@@ -425,8 +429,8 @@ public class AbstractConfig {
* Get a configured instance of the give class specified by the given configuration key. If the object implements
* Configurable configure it using the configuration.
*
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t The interface the class should implement
* @param configOverrides override origin configs
* @return A configured instance of the class
*/
@@ -440,8 +444,9 @@ public class AbstractConfig {
* Get a list of configured instances of the given class specified by the given configuration key. The configuration
* may specify either null or an empty string to indicate no configured instances. In both cases, this method
* returns an empty list to indicate no configured instances.
+ *
* @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param t The interface the class should implement
* @return The list of configured instances
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
@@ -452,8 +457,9 @@ public class AbstractConfig {
* Get a list of configured instances of the given class specified by the given configuration key. The configuration
* may specify either null or an empty string to indicate no configured instances. In both cases, this method
* returns an empty list to indicate no configured instances.
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ *
+ * @param key The configuration key for the class
+ * @param t The interface the class should implement
* @param configOverrides Configuration overrides to use.
* @return The list of configured instances
*/
@@ -465,8 +471,9 @@ public class AbstractConfig {
* Get a list of configured instances of the given class specified by the given configuration key. The configuration
* may specify either null or an empty string to indicate no configured instances. In both cases, this method
* returns an empty list to indicate no configured instances.
- * @param classNames The list of class names of the instances to create
- * @param t The interface the class should implement
+ *
+ * @param classNames The list of class names of the instances to create
+ * @param t The interface the class should implement
* @param configOverrides Configuration overrides to use.
* @return The list of configured instances
*/
@@ -476,14 +483,28 @@ public class AbstractConfig {
return objects;
Map<String, Object> configPairs = originals();
configPairs.putAll(configOverrides);
- for (Object klass : classNames) {
- Object o = getConfiguredInstance(klass, t, configPairs);
- objects.add(t.cast(o));
+
+ try {
+ for (Object klass : classNames) {
+ Object o = getConfiguredInstance(klass, t, configPairs);
+ objects.add(t.cast(o));
+ }
+ } catch (Exception e) {
+ for (Object object : objects) {
+ maybeClose(object, "AutoCloseable object constructed and configured during failed call to getConfiguredInstances");
+ }
+ throw e;
}
return objects;
}
- private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
+ private static void maybeClose(Object object, String name) {
+ if (object instanceof AutoCloseable) {
+ Utils.closeQuietly((AutoCloseable) object, name);
+ }
+ }
+
+ private Map<String, String> extractPotentialVariables(Map<?, ?> configMap) {
// Variables are tuples of the form "${providerName:[path:]key}". From the configMap we extract the subset of configs with string
// values as potential variables.
Map<String, String> configMapAsString = new HashMap<>();
@@ -498,12 +519,13 @@ public class AbstractConfig {
/**
* Instantiates given list of config providers and fetches the actual values of config variables from the config providers.
* returns a map of config key and resolved values.
+ *
* @param configProviderProps The map of config provider configs
- * @param originals The map of raw configs.
+ * @param originals The map of raw configs.
* @return map of resolved config variable.
*/
@SuppressWarnings("unchecked")
- private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
+ private Map<String, ?> resolveConfigVariables(Map<String, ?> configProviderProps, Map<String, Object> originals) {
Map<String, String> providerConfigString;
Map<String, ?> configProperties;
Map<String, Object> resolvedOriginals = new HashMap<>();
@@ -549,7 +571,8 @@ public class AbstractConfig {
* config.providers.{name}.class : The Java class name for a provider.
* config.providers.{name}.param.{param-name} : A parameter to be passed to the above Java class on initialization.
* returns a map of config provider name and its instance.
- * @param indirectConfigs The map of potential variable configs
+ *
+ * @param indirectConfigs The map of potential variable configs
* @param providerConfigProperties The map of config provider configs
* @return map map of config provider name and its instance.
*/
@@ -562,7 +585,7 @@ public class AbstractConfig {
Map<String, String> providerMap = new HashMap<>();
- for (String provider: configProviders.split(",")) {
+ for (String provider : configProviders.split(",")) {
String providerClass = providerClassProperty(provider);
if (indirectConfigs.containsKey(providerClass))
providerMap.put(provider, indirectConfigs.get(providerClass));
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f08ac45ddaf..7729892a64f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -151,6 +151,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class KafkaConsumerTest {
+
private final String topic = "test";
private final Uuid topicId = Uuid.randomUuid();
private final TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -503,6 +504,32 @@ public class KafkaConsumerTest {
}
}
+ @Test
+ public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+ final int targetInterceptor = 3;
+
+ try {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName() + ", "
+ + MockConsumerInterceptor.class.getName() + ", "
+ + MockConsumerInterceptor.class.getName());
+
+ MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
+
+ assertThrows(KafkaException.class, () -> {
+ new KafkaConsumer<>(
+ props, new StringDeserializer(), new StringDeserializer());
+ });
+
+ assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
+ assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+
+ } finally {
+ MockConsumerInterceptor.resetCounters();
+ }
+ }
+
@Test
public void testPause() {
KafkaConsumer<byte[], byte[]> consumer = newConsumer(groupId);
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index a1a854fc671..8023c41ac36 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -569,7 +569,31 @@ public class KafkaProducerTest {
MockProducerInterceptor.resetCounters();
}
}
+ @Test
+ public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances() {
+ final int targetInterceptor = 3;
+ try {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+ props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", "
+ + org.apache.kafka.test.MockProducerInterceptor.class.getName() + ", "
+ + org.apache.kafka.test.MockProducerInterceptor.class.getName());
+ props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, "something");
+
+ MockProducerInterceptor.setThrowOnConfigExceptionThreshold(targetInterceptor);
+
+ assertThrows(KafkaException.class, () -> {
+ new KafkaProducer<>(
+ props, new StringSerializer(), new StringSerializer());
+ });
+
+ assertEquals(3, MockProducerInterceptor.CONFIG_COUNT.get());
+ assertEquals(3, MockProducerInterceptor.CLOSE_COUNT.get());
+ } finally {
+ MockProducerInterceptor.resetCounters();
+ }
+ }
@Test
public void testPartitionerClose() {
try {
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index e07cdb8de16..c0c6f8cee37 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.config.provider.MockVaultConfigProvider;
import org.apache.kafka.common.config.provider.MockFileConfigProvider;
+import org.apache.kafka.test.MockConsumerInterceptor;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
@@ -259,6 +260,30 @@ public class AbstractConfigTest {
}
}
+ @Test
+ public void testConfiguredInstancesClosedOnFailure() {
+
+ try {
+ Map<String, String> props = new HashMap<>();
+ String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
+ + MockConsumerInterceptor.class.getName() + ", "
+ + MockConsumerInterceptor.class.getName();
+ props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors);
+ props.put("client.id", "test");
+ TestConfig testConfig = new TestConfig(props);
+
+ MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
+ assertThrows(
+ Exception.class,
+ () -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class)
+ );
+ assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
+ assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
+ } finally {
+ MockConsumerInterceptor.resetCounters();
+ }
+ }
+
@Test
public void testClassConfigs() {
class RestrictedClassLoader extends ClassLoader {
@@ -585,7 +610,6 @@ public class AbstractConfigTest {
public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
private static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters.";
-
static {
CONFIG = new ConfigDef().define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index b01584b62a2..ccc93c9accb 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -41,6 +41,9 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0);
+ public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0);
public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_ON_CONSUME = new AtomicReference<>(NO_CLUSTER_ID);
@@ -55,6 +58,11 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientIdValue == null)
throw new ConfigException("Mock consumer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+ CONFIG_COUNT.incrementAndGet();
+ if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) {
+ throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold.");
+ }
}
@Override
@@ -90,10 +98,16 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume
CLOSE_COUNT.incrementAndGet();
}
+ public static void setThrowOnConfigExceptionThreshold(int value) {
+ THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value);
+ }
+
public static void resetCounters() {
INIT_COUNT.set(0);
CLOSE_COUNT.set(0);
ON_COMMIT_COUNT.set(0);
+ CONFIG_COUNT.set(0);
+ THROW_CONFIG_EXCEPTION.set(0);
CLUSTER_META.set(null);
CLUSTER_ID_BEFORE_ON_CONSUME.set(NO_CLUSTER_ID);
}
diff --git a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index eedc3bdaecd..6ef4f50893e 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -32,6 +32,9 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger CONFIG_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger THROW_CONFIG_EXCEPTION = new AtomicInteger(0);
+ public static final AtomicInteger THROW_ON_CONFIG_EXCEPTION_THRESHOLD = new AtomicInteger(0);
public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new AtomicInteger(0);
@@ -59,6 +62,11 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
Object clientIdValue = configs.get(ProducerConfig.CLIENT_ID_CONFIG);
if (clientIdValue == null)
throw new ConfigException("Mock producer interceptor expects configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+ CONFIG_COUNT.incrementAndGet();
+ if (CONFIG_COUNT.get() == THROW_ON_CONFIG_EXCEPTION_THRESHOLD.get()) {
+ throw new ConfigException("Failed to instantiate interceptor. Reached configuration exception threshold.");
+ }
}
@Override
@@ -89,10 +97,16 @@ public class MockProducerInterceptor implements ClusterResourceListener, Produce
CLOSE_COUNT.incrementAndGet();
}
+ public static void setThrowOnConfigExceptionThreshold(int value) {
+ THROW_ON_CONFIG_EXCEPTION_THRESHOLD.set(value);
+ }
+
public static void resetCounters() {
INIT_COUNT.set(0);
CLOSE_COUNT.set(0);
ONSEND_COUNT.set(0);
+ CONFIG_COUNT.set(0);
+ THROW_CONFIG_EXCEPTION.set(0);
ON_SUCCESS_COUNT.set(0);
ON_ERROR_COUNT.set(0);
ON_ERROR_WITH_METADATA_COUNT.set(0);