You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/26 01:50:24 UTC

kafka git commit: KAFKA-5563: Standardize validation and substitution of connector names in REST API connector configs

Repository: kafka
Updated Branches:
  refs/heads/trunk be2918b3a -> 5a2960f81


KAFKA-5563: Standardize validation and substitution of connector names in REST API connector configs

…from config to own function and added check to create connector call.

Author: Soenke Liebau <so...@opencore.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #4230 from soenkeliebau/KAFKA-5563


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a2960f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a2960f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a2960f8

Branch: refs/heads/trunk
Commit: 5a2960f811c27f59d78dfdb99c7c3c6eeed16c4b
Parents: be2918b
Author: Soenke Liebau <so...@opencore.com>
Authored: Sat Nov 25 17:50:17 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Nov 25 17:50:17 2017 -0800

----------------------------------------------------------------------
 .../rest/resources/ConnectorsResource.java      | 23 ++++++++++++--------
 .../rest/resources/ConnectorsResourceTest.java  |  8 +++++++
 2 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index e681a68..2c03124 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -92,8 +92,7 @@ public class ConnectorsResource {
             throw new BadRequestException("connector name should not contain '/'");
         }
         Map<String, String> configs = createRequest.config();
-        if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
-            configs.put(ConnectorConfig.NAME_CONFIG, name);
+        checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         herder.putConnectorConfig(name, configs, false, cb);
@@ -134,13 +133,7 @@ public class ConnectorsResource {
                                        final @QueryParam("forward") Boolean forward,
                                        final Map<String, String> connectorConfig) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
-        if (includedName != null) {
-            if (!includedName.equals(connector))
-                throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connector + ")");
-        } else {
-            connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector);
-        }
+        checkAndPutConnectorConfigName(connector, connectorConfig);
 
         herder.putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
@@ -225,6 +218,18 @@ public class ConnectorsResource {
         completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
     }
 
+    // Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
+    // object. Throw BadRequestException on mismatch, otherwise put connectorname in config
+    private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
+        String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
+        if (includedName != null) {
+            if (!includedName.equals(connectorName))
+                throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connectorName + ")");
+        } else {
+            connectorConfig.put(ConnectorConfig.NAME_CONFIG, connectorName);
+        }
+    }
+
     // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
     // request to the leader.
     private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index cb86143..89a2218 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -365,6 +365,14 @@ public class ConnectorsResourceTest {
         connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig);
     }
 
+    @Test(expected = BadRequestException.class)
+    public void testCreateConnectorConfigNameMismatch() throws Throwable {
+        Map<String, String> connConfig = new HashMap<>();
+        connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
+        CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
+        connectorsResource.createConnector(FORWARD, request);
+    }
+
     @Test
     public void testGetConnectorTaskConfigs() throws Throwable {
         final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();