You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/02/09 23:47:58 UTC

[kafka] branch trunk updated: KAFKA-6513: Corrected how Converters and HeaderConverters are instantiated and configured

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 976a3b0  KAFKA-6513: Corrected how Converters and HeaderConverters are instantiated and configured
976a3b0 is described below

commit 976a3b0cc858d4a13cf8b34b325aefc8d706be9e
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Fri Feb 9 15:44:55 2018 -0800

    KAFKA-6513: Corrected how Converters and HeaderConverters are instantiated and configured
    
    The commits for KIP-145 (KAFKA-5142) changed how the Connect workers instantiate and configure the Converters, and also added the ability to do the same for the new HeaderConverters. However, the last few commits removed the default value for the `converter.type` property for Converters and HeaderConverters, and this broke how the internal converters were being created.
    
    This change corrects the behavior so that the `converter.type` property is always set by the worker (or by the Plugins class), which means the existing Converter implementations will not have to do this. The built-in JsonConverter, ByteArrayConverter, and StringConverter also implement HeaderConverter which implements Configurable, but the Worker and Plugins methods do not yet use the `Configurable.configure(Map)` method and instead still use the `Converter.configure(Map,boolean)`.
    
    Several tests were modified, and a new PluginsTest was added to verify the new behavior in Plugins for instantiating and configuring the Converter and HeaderConverter instances.
    
    Author: Randall Hauch <rh...@gmail.com>
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4512 from rhauch/kafka-6513
---
 .../kafka/connect/storage/HeaderConverter.java     |   2 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  59 +++---
 .../runtime/isolation/DelegatingClassLoader.java   |  11 +
 .../runtime/isolation/PluginScanResult.java        |  13 +-
 .../kafka/connect/runtime/isolation/Plugins.java   | 171 ++++++++++++----
 .../apache/kafka/connect/runtime/WorkerTest.java   | 227 ++++++++++++++-------
 .../connect/runtime/isolation/PluginsTest.java     | 191 +++++++++++++++++
 7 files changed, 520 insertions(+), 154 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
index b8455e1..3f9d504 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
@@ -36,7 +36,7 @@ public interface HeaderConverter extends Configurable, Closeable {
     SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value);
 
     /**
-     * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
+     * Convert the {@link Header}'s {@link Header#value() value} into its byte array representation.
      * @param topic the name of the topic for the record containing the header
      * @param headerKey the header's key; may not be null
      * @param schema the schema for the header's value; may be null
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 834eb39..e3d9cf4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -31,13 +31,12 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.ConverterConfig;
-import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
@@ -105,19 +104,14 @@ public class Worker {
 
         // Internal converters are required properties, thus getClass won't return null.
         this.internalKeyConverter = plugins.newConverter(
-                config.getClass(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG).getName(),
-                config
+                config,
+                WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
+                ClassLoaderUsage.PLUGINS
         );
-        this.internalKeyConverter.configure(
-                config.originalsWithPrefix("internal.key.converter."),
-                true);
         this.internalValueConverter = plugins.newConverter(
-                config.getClass(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG).getName(),
-                config
-        );
-        this.internalValueConverter.configure(
-                config.originalsWithPrefix("internal.value.converter."),
-                false
+                config,
+                WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
+                ClassLoaderUsage.PLUGINS
         );
 
         this.offsetBackingStore = offsetBackingStore;
@@ -383,30 +377,33 @@ public class Worker {
             log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
             // By maintaining connector's specific class loader for this thread here, we first
-            // search for converters within the connector dependencies, and if not found the
-            // plugin class loader delegates loading to the delegating classloader.
-            Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+            // search for converters within the connector dependencies.
+            // If any of these aren't found, that means the connector didn't configure specific converters,
+            // so we should instantiate based upon the worker configuration
+            Converter keyConverter = plugins.newConverter(
+                    connConfig,
+                    WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
+            Converter valueConverter = plugins.newConverter(
+                    connConfig,
+                    WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
+            HeaderConverter headerConverter = plugins.newHeaderConverter(
+                    connConfig,
+                    WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                    ClassLoaderUsage.CURRENT_CLASSLOADER
+            );
             if (keyConverter == null) {
-                String className = config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName();
-                keyConverter = plugins.newConverter(className, config);
+                keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
-
-            Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
             if (valueConverter == null) {
-                String className = config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName();
-                valueConverter = plugins.newConverter(className, config);
+                valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
-
-            HeaderConverter headerConverter = connConfig.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, HeaderConverter.class);
             if (headerConverter == null) {
-                String className = config.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG).getName();
-                headerConverter = plugins.newHeaderConverter(className, config);
+                headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
             }
-            Map<String, Object> converterConfig = connConfig.originalsWithPrefix("header.converter.");
-            converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
-            headerConverter.configure(converterConfig);
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
             workerTask.initialize(taskConfig);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 8a44d4d..345d7ef 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.reflections.Reflections;
 import org.reflections.util.ClasspathHelper;
@@ -58,6 +59,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private final Map<String, String> aliases;
     private final SortedSet<PluginDesc<Connector>> connectors;
     private final SortedSet<PluginDesc<Converter>> converters;
+    private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
     private final SortedSet<PluginDesc<Transformation>> transformations;
     private final List<String> pluginPaths;
     private final Map<Path, PluginClassLoader> activePaths;
@@ -70,6 +72,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         this.activePaths = new HashMap<>();
         this.connectors = new TreeSet<>();
         this.converters = new TreeSet<>();
+        this.headerConverters = new TreeSet<>();
         this.transformations = new TreeSet<>();
     }
 
@@ -85,6 +88,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         return converters;
     }
 
+    public Set<PluginDesc<HeaderConverter>> headerConverters() {
+        return headerConverters;
+    }
+
     public Set<PluginDesc<Transformation>> transformations() {
         return transformations;
     }
@@ -214,6 +221,8 @@ public class DelegatingClassLoader extends URLClassLoader {
             connectors.addAll(plugins.connectors());
             addPlugins(plugins.converters(), loader);
             converters.addAll(plugins.converters());
+            addPlugins(plugins.headerConverters(), loader);
+            headerConverters.addAll(plugins.headerConverters());
             addPlugins(plugins.transformations(), loader);
             transformations.addAll(plugins.transformations());
         }
@@ -265,6 +274,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         return new PluginScanResult(
                 getPluginDesc(reflections, Connector.class, loader),
                 getPluginDesc(reflections, Converter.class, loader),
+                getPluginDesc(reflections, HeaderConverter.class, loader),
                 getPluginDesc(reflections, Transformation.class, loader)
         );
     }
@@ -319,6 +329,7 @@ public class DelegatingClassLoader extends URLClassLoader {
     private void addAllAliases() {
         addAliases(connectors);
         addAliases(converters);
+        addAliases(headerConverters);
         addAliases(transformations);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index f3d2f21..c680f08 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 
 import java.util.Collection;
@@ -25,15 +26,18 @@ import java.util.Collection;
 public class PluginScanResult {
     private final Collection<PluginDesc<Connector>> connectors;
     private final Collection<PluginDesc<Converter>> converters;
+    private final Collection<PluginDesc<HeaderConverter>> headerConverters;
     private final Collection<PluginDesc<Transformation>> transformations;
 
     public PluginScanResult(
             Collection<PluginDesc<Connector>> connectors,
             Collection<PluginDesc<Converter>> converters,
+            Collection<PluginDesc<HeaderConverter>> headerConverters,
             Collection<PluginDesc<Transformation>> transformations
     ) {
         this.connectors = connectors;
         this.converters = converters;
+        this.headerConverters = headerConverters;
         this.transformations = transformations;
     }
 
@@ -45,11 +49,18 @@ public class PluginScanResult {
         return converters;
     }
 
+    public Collection<PluginDesc<HeaderConverter>> headerConverters() {
+        return headerConverters;
+    }
+
     public Collection<PluginDesc<Transformation>> transformations() {
         return transformations;
     }
 
     public boolean isEmpty() {
-        return connectors().isEmpty() && converters().isEmpty() && transformations().isEmpty();
+        return connectors().isEmpty()
+               && converters().isEmpty()
+               && headerConverters().isEmpty()
+               && transformations().isEmpty();
     }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index bcf2afb..94f2771 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
-import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
@@ -25,6 +25,8 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.slf4j.Logger;
@@ -39,6 +41,12 @@ import java.util.Map;
 import java.util.Set;
 
 public class Plugins {
+
+    public enum ClassLoaderUsage {
+        CURRENT_CLASSLOADER,
+        PLUGINS
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Plugins.class);
     private final DelegatingClassLoader delegatingLoader;
 
@@ -71,14 +79,6 @@ public class Plugins {
         }
     }
 
-    protected static <T> T newConfiguredPlugin(AbstractConfig config, Class<T> klass) {
-        T plugin = Utils.newInstance(klass);
-        if (plugin instanceof Configurable) {
-            ((Configurable) plugin).configure(config.originals());
-        }
-        return plugin;
-    }
-
     @SuppressWarnings("unchecked")
     protected static <U> Class<? extends U> pluginClass(
             DelegatingClassLoader loader,
@@ -185,46 +185,131 @@ public class Plugins {
         return newPlugin(taskClass);
     }
 
-    public Converter newConverter(String converterClassOrAlias) {
-        return newConverter(converterClassOrAlias, null);
+    /**
+     * If the given configuration defines a {@link Converter} using the named configuration property, return a new configured instance.
+     *
+     * @param config             the configuration containing the {@link Converter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the name of the {@link Converter} class; may not be null
+     * @param classLoaderUsage   which classloader should be used
+     * @return the instantiated and configured {@link Converter}; null if the configuration did not define the specified property
+     * @throws ConnectException if the {@link Converter} implementation class could not be found
+     */
+    public Converter newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the converter via the specified property name
+            return null;
+        }
+        Converter plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and plugins as a fallback.
+                // Note: we can't use config.getConfiguredInstance because Converter doesn't implement Configurable, and even if it did
+                // we have to remove the property prefixes before calling config(...) and we still always want to call Converter.config.
+                plugin = getInstance(config, classPropertyName, Converter.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses the current classloader as a fallback
+                String converterClassOrAlias = config.getClass(classPropertyName).getName();
+                Class<? extends Converter> klass;
+                try {
+                    klass = pluginClass(delegatingLoader, converterClassOrAlias, Converter.class);
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements Converter and which name matches "
+                            + converterClassOrAlias + ", available converters are: "
+                            + pluginNames(delegatingLoader.converters())
+                    );
+                }
+                plugin = newPlugin(klass);
+                break;
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'");
+        }
+
+        // Determine whether this is a key or value converter based upon the supplied property name ...
+        final boolean isKeyConverter = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName)
+                                     || WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG.equals(classPropertyName);
+
+        // Configure the Converter using only the old configuration mechanism ...
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
+        plugin.configure(converterConfig, isKeyConverter);
+        return plugin;
     }
 
-    public Converter newConverter(String converterClassOrAlias, AbstractConfig config) {
-        Class<? extends Converter> klass;
-        try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    converterClassOrAlias,
-                    Converter.class
-            );
-        } catch (ClassNotFoundException e) {
-            throw new ConnectException(
-                    "Failed to find any class that implements Converter and which name matches "
-                            + converterClassOrAlias
-                            + ", available converters are: "
-                            + pluginNames(delegatingLoader.converters())
-            );
+    /**
+     * If the given configuration defines a {@link HeaderConverter} using the named configuration property, return a new configured
+     * instance.
+     *
+     * @param config             the configuration containing the {@link Converter}'s configuration; may not be null
+     * @param classPropertyName  the name of the property that contains the name of the {@link Converter} class; may not be null
+     * @param classLoaderUsage   which classloader should be used
+     * @return the instantiated and configured {@link HeaderConverter}; null if the configuration did not define the specified property
+     * @throws ConnectException if the {@link HeaderConverter} implementation class could not be found
+     */
+    public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        if (!config.originals().containsKey(classPropertyName)) {
+            // This configuration does not define the header converter via the specified property name
+            return null;
         }
-        return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
+        HeaderConverter plugin = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                // Attempt to load first with the current classloader, and plugins as a fallback.
+                // Note: we can't use config.getConfiguredInstance because we have to remove the property prefixes
+                // before calling config(...)
+                plugin = getInstance(config, classPropertyName, HeaderConverter.class);
+                break;
+            case PLUGINS:
+                // Attempt to load with the plugin class loader, which uses the current classloader as a fallback
+                String converterClassOrAlias = config.getClass(classPropertyName).getName();
+                Class<? extends HeaderConverter> klass;
+                try {
+                    klass = pluginClass(
+                            delegatingLoader,
+                            converterClassOrAlias,
+                            HeaderConverter.class
+                    );
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements HeaderConverter and which name matches "
+                                    + converterClassOrAlias
+                                    + ", available header converters are: "
+                                    + pluginNames(delegatingLoader.headerConverters())
+                    );
+                }
+                plugin = newPlugin(klass);
+        }
+        if (plugin == null) {
+            throw new ConnectException("Unable to instantiate the Converter specified in '" + classPropertyName + "'");
+        }
+
+        String configPrefix = classPropertyName + ".";
+        Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
+        converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
+        plugin.configure(converterConfig);
+        return plugin;
     }
 
-    public HeaderConverter newHeaderConverter(String converterClassOrAlias, AbstractConfig config) {
-        Class<? extends HeaderConverter> klass;
-        try {
-            klass = pluginClass(
-                    delegatingLoader,
-                    converterClassOrAlias,
-                    HeaderConverter.class
-            );
-        } catch (ClassNotFoundException e) {
-            throw new ConnectException(
-                    "Failed to find any class that implements HeaderConverter and which name matches "
-                            + converterClassOrAlias
-                            + ", available header converters are: "
-                            + pluginNames(delegatingLoader.converters())
-            );
+    /**
+     * Get an instance of the give class specified by the given configuration key.
+     *
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return A instance of the class
+     */
+    private <T> T getInstance(AbstractConfig config, String key, Class<T> t) {
+        Class<?> c = config.getClass(key);
+        if (c == null) {
+            return null;
+        }
+        // Instantiate the class, but we don't know if the class extends the supplied type
+        Object o = Utils.newInstance(c);
+        if (!t.isInstance(o)) {
+            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
         }
-        return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
+        return t.cast(o);
     }
 
     public <R extends ConnectRecord<R>> Transformation<R> newTranformations(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 2c04b88..f062436 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.Time;
@@ -28,11 +30,13 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.runtime.MockConnectMetrics.MockMetricsReporter;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -64,8 +68,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -99,6 +106,9 @@ public class WorkerTest extends ThreadedTest {
     @Mock private WorkerSourceTask workerTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private Converter taskKeyConverter;
+    @Mock private Converter taskValueConverter;
+    @Mock private HeaderConverter taskHeaderConverter;
 
     @Before
     public void setup() {
@@ -140,7 +150,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -250,7 +260,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -314,7 +324,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -391,7 +401,7 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(3);
-        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
+        connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -461,7 +471,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
-        expectConverters(true);
+        expectConverters();
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -470,19 +480,19 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
                 EasyMock.eq(config),
-                EasyMock.anyObject(ConnectMetrics.class),
-                EasyMock.anyObject(ClassLoader.class),
-                EasyMock.anyObject(Time.class))
+                anyObject(ConnectMetrics.class),
+                anyObject(ClassLoader.class),
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -496,11 +506,14 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in is only mocked.
-        // Serializers for the Producer that the task generates. These are loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called once due to caching
-        // EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will find them using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -595,7 +608,7 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
-        expectConverters(true);
+        expectConverters();
         expectStartStorage();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -604,19 +617,19 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
-                EasyMock.anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
+                anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(ConnectMetrics.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
+                anyObject(WorkerConfig.class),
+                anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                EasyMock.anyObject(Time.class))
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -630,11 +643,17 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in is only mocked.
-        // Serializers for the Producer that the task generates. These are loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called once due to caching
-        // EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will not initially find them using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskValueConverters(ClassLoaderUsage.PLUGINS, taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -682,26 +701,26 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
-        Capture<TestConverter> valueConverter = EasyMock.newCapture();
+        Capture<TestConfigurableConverter> valueConverter = EasyMock.newCapture();
         Capture<HeaderConverter> headerConverter = EasyMock.newCapture();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
-                EasyMock.anyObject(TaskStatus.Listener.class),
+                anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
                 EasyMock.capture(headerConverter),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
-                EasyMock.anyObject(KafkaProducer.class),
-                EasyMock.anyObject(OffsetStorageReader.class),
-                EasyMock.anyObject(OffsetStorageWriter.class),
-                EasyMock.anyObject(WorkerConfig.class),
-                EasyMock.anyObject(ConnectMetrics.class),
+                anyObject(KafkaProducer.class),
+                anyObject(OffsetStorageReader.class),
+                anyObject(OffsetStorageWriter.class),
+                anyObject(WorkerConfig.class),
+                anyObject(ConnectMetrics.class),
                 EasyMock.eq(pluginLoader),
-                EasyMock.anyObject(Time.class))
+                anyObject(Time.class))
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
@@ -715,11 +734,17 @@ public class WorkerTest extends ThreadedTest {
 
         workerTask.initialize(taskConfig);
         EasyMock.expectLastCall();
-        // We should expect this call, but the pluginLoader being swapped in is only mocked.
-        // Serializers for the Producer that the task generates. These are loaded while the PluginClassLoader is active
-        // and then delegated to the system classloader. This is only called once due to caching
-        // EasyMock.expect(pluginLoader.loadClass(ByteArraySerializer.class.getName()))
-        //        .andReturn((Class) ByteArraySerializer.class);
+
+        // Expect that the worker will create converters and will not initially find them using the current classloader ...
+        assertNotNull(taskKeyConverter);
+        assertNotNull(taskValueConverter);
+        assertNotNull(taskHeaderConverter);
+        expectTaskKeyConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskKeyConverters(ClassLoaderUsage.PLUGINS, taskKeyConverter);
+        expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskValueConverters(ClassLoaderUsage.PLUGINS, taskValueConverter);
+        expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
+        expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
 
         workerTask.run();
         EasyMock.expectLastCall();
@@ -753,7 +778,7 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> connProps = anyConnectorConfigMap();
         connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         connProps.put("key.converter.extra.config", "foo");
-        connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConfigurableConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
         worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
@@ -765,9 +790,7 @@ public class WorkerTest extends ThreadedTest {
         worker.stop();
         assertStatistics(worker, 0, 0);
 
-        // Validate extra configs got passed through to overridden converters
-        assertEquals("foo", keyConverter.getValue().configs.get("extra.config"));
-        assertEquals("bar", valueConverter.getValue().configs.get("extra.config"));
+        // We've mocked the Plugin.newConverter method, so we don't currently configure the converters
 
         PowerMock.verifyAll();
     }
@@ -808,7 +831,7 @@ public class WorkerTest extends ThreadedTest {
     }
 
     private void expectStartStorage() {
-        offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
+        offsetBackingStore.configure(anyObject(WorkerConfig.class));
         EasyMock.expectLastCall();
         offsetBackingStore.start();
         EasyMock.expectLastCall();
@@ -832,19 +855,10 @@ public class WorkerTest extends ThreadedTest {
         if (expectDefaultConverters) {
 
             // Instantiate and configure default
-            EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+            EasyMock.expect(plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS))
                     .andReturn(keyConverter);
-            keyConverter.configure(
-                    EasyMock.<Map<String, ?>>anyObject(),
-                    EasyMock.anyBoolean()
-            );
-            EasyMock.expectLastCall();
-            EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
+            EasyMock.expect(plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS))
                     .andReturn(valueConverter);
-            valueConverter.configure(
-                    EasyMock.<Map<String, ?>>anyObject(),
-                    EasyMock.anyBoolean()
-            );
             EasyMock.expectLastCall();
         }
 
@@ -853,22 +867,50 @@ public class WorkerTest extends ThreadedTest {
         Converter internalValueConverter = PowerMock.createMock(converterClass);
 
         // Instantiate and configure internal
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
-                .andReturn(internalKeyConverter);
-        internalKeyConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
-        EasyMock.expectLastCall();
-        EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
-                .andReturn(internalValueConverter);
-        internalValueConverter.configure(
-                EasyMock.<Map<String, ?>>anyObject(),
-                EasyMock.anyBoolean()
-        );
+        EasyMock.expect(
+                plugins.newConverter(
+                        config,
+                        WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
+                        ClassLoaderUsage.PLUGINS
+                )
+        ).andReturn(internalKeyConverter);
+        EasyMock.expect(
+                plugins.newConverter(
+                        config,
+                        WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
+                        ClassLoaderUsage.PLUGINS
+                )
+        ).andReturn(internalValueConverter);
         EasyMock.expectLastCall();
     }
 
+    private void expectTaskKeyConverters(ClassLoaderUsage classLoaderUsage, Converter returning) {
+        EasyMock.expect(
+                plugins.newConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
+    private void expectTaskValueConverters(ClassLoaderUsage classLoaderUsage, Converter returning) {
+        EasyMock.expect(
+                plugins.newConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
+    private void expectTaskHeaderConverter(ClassLoaderUsage classLoaderUsage, HeaderConverter returning) {
+        EasyMock.expect(
+                plugins.newHeaderConverter(
+                        anyObject(AbstractConfig.class),
+                        eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG),
+                        eq(classLoaderUsage)))
+                .andReturn(returning);
+    }
+
     private Map<String, String> anyConnectorConfigMap() {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
@@ -955,4 +997,33 @@ public class WorkerTest extends ThreadedTest {
             return null;
         }
     }
+
+    public static class TestConfigurableConverter implements Converter, Configurable {
+        public Map<String, ?> configs;
+
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` config be set
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            this.configs = configs;
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
new file mode 100644
index 0000000..6de92ee
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class PluginsTest {
+
+    private static Map<String, String> props;
+    private static Plugins plugins;
+    private AbstractConfig config;
+    private TestConverter converter;
+    private TestHeaderConverter headerConverter;
+
+    @BeforeClass
+    public static void beforeAll() {
+        props = new HashMap<>();
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        props.put("key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
+        props.put("value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
+        props.put("key.converter.extra.config", "foo1");
+        props.put("value.converter.extra.config", "foo2");
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        props.put("internal.key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        props.put("internal.value.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        props.put("internal.key.converter.extra.config", "bar1");
+        props.put("internal.value.converter.extra.config", "bar2");
+        props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
+        props.put("header.converter.extra.config", "baz");
+
+        // Set up the plugins to have no additional plugin directories.
+        // This won't allow us to test classpath isolation, but it will allow us to test some of the utility methods.
+        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
+        plugins = new Plugins(props);
+    }
+
+    @Before
+    public void setup() {
+        this.config = new TestableWorkerConfig(props);
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureConverters() {
+        instantiateAndConfigureConverter(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("true", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("foo1", converter.configs.get("extra.config"));
+
+        instantiateAndConfigureConverter(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("true", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("foo2", converter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureInternalConverters() {
+        instantiateAndConfigureConverter(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.CURRENT_CLASSLOADER);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("bar1", converter.configs.get("extra.config"));
+
+        instantiateAndConfigureConverter(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS);
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("false", converter.configs.get(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG));
+        assertEquals("bar2", converter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureHeaderConverter() {
+        instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+        // Validate extra configs got passed through to overridden converters
+        assertConverterType(ConverterType.HEADER, headerConverter.configs);
+        assertEquals("baz", headerConverter.configs.get("extra.config"));
+    }
+
+    protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage classLoaderUsage) {
+        converter = (TestConverter) plugins.newConverter(config, configPropName, classLoaderUsage);
+        assertNotNull(converter);
+    }
+
+    protected void instantiateAndConfigureHeaderConverter(String configPropName) {
+        headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+    }
+
+    protected void assertConverterType(ConverterType type, Map<String, ?> props) {
+        assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
+    }
+
+    public static class TestableWorkerConfig extends WorkerConfig {
+        public TestableWorkerConfig(Map<String, String> props) {
+            super(WorkerConfig.baseConfigDef(), props);
+        }
+    }
+
+    public static class TestConverter implements Converter, Configurable {
+        public Map<String, ?> configs;
+
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` config be set
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            this.configs = configs;
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
+
+    public static class TestHeaderConverter implements HeaderConverter {
+        public Map<String, ?> configs;
+
+        @Override
+        public ConfigDef config() {
+            return JsonConverterConfig.configDef();
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            this.configs = configs;
+            new JsonConverterConfig(configs); // requires the `converter.type` config be set
+        }
+
+        @Override
+        public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+            return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.