You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/04 22:07:38 UTC

kafka git commit: KAFKA-4837: Fix class name comparison in connector-plugins REST endpoint

Repository: kafka
Updated Branches:
  refs/heads/trunk 54bf2fb5f -> 916081007


KAFKA-4837: Fix class name comparison in connector-plugins REST endpoint

Author: Konstantine Karantasis <ko...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2798 from kkonstantine/KAFKA-4837-Config-validation-in-Connector-plugins-need-to-compare-against-both-canonical-and-simple-class-names


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/91608100
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/91608100
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/91608100

Branch: refs/heads/trunk
Commit: 9160810072e540a3a037481505aaaa23e3b50546
Parents: 54bf2fb
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Tue Apr 4 15:07:32 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Apr 4 15:07:32 2017 -0700

----------------------------------------------------------------------
 .../resources/ConnectorPluginsResource.java     |  23 +-
 .../resources/ConnectorPluginsResourceTest.java | 215 ++++++++++++++++++-
 2 files changed, 224 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/91608100/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 0d39e81..37e0f01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType;
 @Consumes(MediaType.APPLICATION_JSON)
 public class ConnectorPluginsResource {
 
+    private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
 
     public ConnectorPluginsResource(Herder herder) {
@@ -47,11 +48,18 @@ public class ConnectorPluginsResource {
 
     @PUT
     @Path("/{connectorType}/config/validate")
-    public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType,
-                                       final Map<String, String> connectorConfig) throws Throwable {
+    public ConfigInfos validateConfigs(
+        final @PathParam("connectorType") String connType,
+        final Map<String, String> connectorConfig
+    ) throws Throwable {
         String includedConnType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
-        if (includedConnType != null && !includedConnType.equals(connType))
-            throw new BadRequestException("Included connector type " + includedConnType + " does not match request type " + connType);
+        if (includedConnType != null
+            && !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(connType))) {
+            throw new BadRequestException(
+                "Included connector type " + includedConnType + " does not match request type "
+                    + connType
+            );
+        }
 
         return herder.validateConnectorConfig(connectorConfig);
     }
@@ -61,4 +69,11 @@ public class ConnectorPluginsResource {
     public List<ConnectorPluginInfo> listConnectorPlugins() {
         return PluginDiscovery.connectorPlugins();
     }
+
+    private String normalizedPluginName(String pluginName) {
+        // Works for both full and simple class names. In the latter case, it generates the alias.
+        return pluginName.endsWith(ALIAS_SUFFIX) && pluginName.length() > ALIAS_SUFFIX.length()
+            ? pluginName.substring(0, pluginName.length() - ALIAS_SUFFIX.length())
+            : pluginName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/91608100/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index e033b6b..088b520 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -64,6 +64,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import javax.ws.rs.BadRequestException;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -73,46 +75,61 @@ import static org.junit.Assert.assertTrue;
 @PowerMockIgnore("javax.management.*")
 public class ConnectorPluginsResourceTest {
 
-    private static Map<String, String> props = new HashMap<>();
+    private static Map<String, String> props;
+    private static Map<String, String> partialProps = new HashMap<>();
     static {
-        props.put("name", "test");
-        props.put("test.string.config", "testString");
-        props.put("test.int.config", "1");
-        props.put("test.list.config", "a,b");
+        partialProps.put("name", "test");
+        partialProps.put("test.string.config", "testString");
+        partialProps.put("test.int.config", "1");
+        partialProps.put("test.list.config", "a,b");
+
+        props = new HashMap<>(partialProps);
+        props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
     }
 
     private static final ConfigInfos CONFIG_INFOS;
-    private static final int ERROR_COUNT = 1;
+    private static final ConfigInfos PARTIAL_CONFIG_INFOS;
+    private static final int ERROR_COUNT = 0;
+    private static final int PARTIAL_CONFIG_ERROR_COUNT = 1;
 
     static {
         List<ConfigInfo> configs = new LinkedList<>();
+        List<ConfigInfo> partialConfigs = new LinkedList<>();
 
         ConfigDef connectorConfigDef = ConnectorConfig.configDef();
         List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(props);
+        List<ConfigValue> partialConnectorConfigValues = connectorConfigDef.validate(partialProps);
         ConfigInfos result = AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), connectorConfigDef.configKeys(), connectorConfigValues, Collections.<String>emptyList());
+        ConfigInfos partialResult = AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), connectorConfigDef.configKeys(), partialConnectorConfigValues, Collections.<String>emptyList());
         configs.addAll(result.values());
+        partialConfigs.addAll(partialResult.values());
 
         ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", Collections.<String>emptyList());
         ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
         ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
+        partialConfigs.add(configInfo);
 
         configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", "Test", 1, "MEDIUM", "test.int.config", Collections.<String>emptyList());
         configValueInfo = new ConfigValueInfo("test.int.config", "1", Arrays.asList("1", "2", "3"), Collections.<String>emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
+        partialConfigs.add(configInfo);
 
         configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", Collections.<String>emptyList());
         configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<String>emptyList(), Collections.<String>emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
+        partialConfigs.add(configInfo);
 
         configKeyInfo = new ConfigKeyInfo("test.list.config", "LIST", true, "", "HIGH", "Test configuration for list type.", "Test", 2, "LONG", "test.list.config", Collections.<String>emptyList());
         configValueInfo = new ConfigValueInfo("test.list.config", "a,b", Arrays.asList("a", "b", "c"), Collections.<String>emptyList(), true);
         configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
         configs.add(configInfo);
+        partialConfigs.add(configInfo);
 
         CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs);
+        PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
     }
 
     @Mock
@@ -127,7 +144,55 @@ public class ConnectorPluginsResourceTest {
     }
 
     @Test
-    public void testValidateConfig() throws Throwable {
+    public void testValidateConfigWithSingleErrorDueToMissingConnectorClassname() throws Throwable {
+        herder.validateConnectorConfig(EasyMock.eq(partialProps));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+                List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(partialProps);
+
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                Config config = connector.validate(partialProps);
+                ConfigDef configDef = connector.config();
+                Map<String, ConfigDef.ConfigKey> configKeys = configDef.configKeys();
+                List<ConfigValue> configValues = config.configValues();
+
+                Map<String, ConfigDef.ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+                resultConfigKeys.putAll(connectorConfigDef.configKeys());
+                configValues.addAll(connectorConfigValues);
+
+                return AbstractHerder.generateResult(
+                    ConnectorPluginsResourceTestConnector.class.getName(),
+                    resultConfigKeys,
+                    configValues,
+                    Collections.singletonList("Test")
+                );
+            }
+        });
+
+        PowerMock.replayAll();
+
+        // This call to validateConfigs does not throw a BadRequestException because we've mocked
+        // validateConnectorConfig.
+        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(
+            ConnectorPluginsResourceTestConnector.class.getSimpleName(),
+            partialProps
+        );
+        assertEquals(PARTIAL_CONFIG_INFOS.name(), configInfos.name());
+        assertEquals(PARTIAL_CONFIG_INFOS.errorCount(), configInfos.errorCount());
+        assertEquals(PARTIAL_CONFIG_INFOS.groups(), configInfos.groups());
+        assertEquals(
+            new HashSet<>(PARTIAL_CONFIG_INFOS.values()),
+            new HashSet<>(configInfos.values())
+        );
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testValidateConfigWithSimpleName() throws Throwable {
         herder.validateConnectorConfig(EasyMock.eq(props));
 
         PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
@@ -146,14 +211,24 @@ public class ConnectorPluginsResourceTest {
                 resultConfigKeys.putAll(connectorConfigDef.configKeys());
                 configValues.addAll(connectorConfigValues);
 
-                return AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), resultConfigKeys, configValues, Collections.singletonList("Test"));
+                return AbstractHerder.generateResult(
+                    ConnectorPluginsResourceTestConnector.class.getName(),
+                    resultConfigKeys,
+                    configValues,
+                    Collections.singletonList("Test")
+                );
             }
         });
+
         PowerMock.replayAll();
 
-        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(ConnectorPluginsResourceTestConnector.class.getName(), props);
+        // make a request to connector-plugins resource using just the simple class name.
+        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(
+            ConnectorPluginsResourceTestConnector.class.getSimpleName(),
+            props
+        );
         assertEquals(CONFIG_INFOS.name(), configInfos.name());
-        assertEquals(CONFIG_INFOS.errorCount(), configInfos.errorCount());
+        assertEquals(0, configInfos.errorCount());
         assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
         assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
 
@@ -161,6 +236,126 @@ public class ConnectorPluginsResourceTest {
     }
 
     @Test
+    public void testValidateConfigWithAlias() throws Throwable {
+        herder.validateConnectorConfig(EasyMock.eq(props));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+                List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(props);
+
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                Config config = connector.validate(props);
+                ConfigDef configDef = connector.config();
+                Map<String, ConfigDef.ConfigKey> configKeys = configDef.configKeys();
+                List<ConfigValue> configValues = config.configValues();
+
+                Map<String, ConfigDef.ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+                resultConfigKeys.putAll(connectorConfigDef.configKeys());
+                configValues.addAll(connectorConfigValues);
+
+                return AbstractHerder.generateResult(
+                    ConnectorPluginsResourceTestConnector.class.getName(),
+                    resultConfigKeys,
+                    configValues,
+                    Collections.singletonList("Test")
+                );
+            }
+        });
+
+        PowerMock.replayAll();
+
+        // make a request to connector-plugins resource using a valid alias.
+        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(
+            "ConnectorPluginsResourceTest",
+            props
+        );
+        assertEquals(CONFIG_INFOS.name(), configInfos.name());
+        assertEquals(0, configInfos.errorCount());
+        assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
+        assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = BadRequestException.class)
+    public void testValidateConfigWithNonExistentName() throws Throwable {
+        herder.validateConnectorConfig(EasyMock.eq(props));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+                List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(props);
+
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                Config config = connector.validate(props);
+                ConfigDef configDef = connector.config();
+                Map<String, ConfigDef.ConfigKey> configKeys = configDef.configKeys();
+                List<ConfigValue> configValues = config.configValues();
+
+                Map<String, ConfigDef.ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+                resultConfigKeys.putAll(connectorConfigDef.configKeys());
+                configValues.addAll(connectorConfigValues);
+
+                return AbstractHerder.generateResult(
+                    ConnectorPluginsResourceTestConnector.class.getName(),
+                    resultConfigKeys,
+                    configValues,
+                    Collections.singletonList("Test")
+                );
+            }
+        });
+
+        PowerMock.replayAll();
+
+        // make a request to connector-plugins resource using a non-loaded connector with the same
+        // simple name but different package.
+        String customClassname = "com.custom.package."
+            + ConnectorPluginsResourceTestConnector.class.getSimpleName();
+        connectorPluginsResource.validateConfigs(customClassname, props);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test(expected = BadRequestException.class)
+    public void testValidateConfigWithNonExistentAlias() throws Throwable {
+        herder.validateConnectorConfig(EasyMock.eq(props));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+                List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(props);
+
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                Config config = connector.validate(props);
+                ConfigDef configDef = connector.config();
+                Map<String, ConfigDef.ConfigKey> configKeys = configDef.configKeys();
+                List<ConfigValue> configValues = config.configValues();
+
+                Map<String, ConfigDef.ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+                resultConfigKeys.putAll(connectorConfigDef.configKeys());
+                configValues.addAll(connectorConfigValues);
+
+                return AbstractHerder.generateResult(
+                    ConnectorPluginsResourceTestConnector.class.getName(),
+                    resultConfigKeys,
+                    configValues,
+                    Collections.singletonList("Test")
+                );
+            }
+        });
+
+        PowerMock.replayAll();
+
+        connectorPluginsResource.validateConfigs("ConnectorPluginsTest", props);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testListConnectorPlugins() {
         Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
         assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName())));