You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/11/26 01:50:24 UTC
kafka git commit: KAFKA-5563: Standardize validation and substitution
of connector names in REST API connector configs
Repository: kafka
Updated Branches:
refs/heads/trunk be2918b3a -> 5a2960f81
KAFKA-5563: Standardize validation and substitution of connector names in REST API connector configs
…from config to own function and added check to create connector call.
Author: Soenke Liebau <so...@opencore.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4230 from soenkeliebau/KAFKA-5563
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5a2960f8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5a2960f8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5a2960f8
Branch: refs/heads/trunk
Commit: 5a2960f811c27f59d78dfdb99c7c3c6eeed16c4b
Parents: be2918b
Author: Soenke Liebau <so...@opencore.com>
Authored: Sat Nov 25 17:50:17 2017 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Nov 25 17:50:17 2017 -0800
----------------------------------------------------------------------
.../rest/resources/ConnectorsResource.java | 23 ++++++++++++--------
.../rest/resources/ConnectorsResourceTest.java | 8 +++++++
2 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index e681a68..2c03124 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -92,8 +92,7 @@ public class ConnectorsResource {
throw new BadRequestException("connector name should not contain '/'");
}
Map<String, String> configs = createRequest.config();
- if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
- configs.put(ConnectorConfig.NAME_CONFIG, name);
+ checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
@@ -134,13 +133,7 @@ public class ConnectorsResource {
final @QueryParam("forward") Boolean forward,
final Map<String, String> connectorConfig) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
- String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
- if (includedName != null) {
- if (!includedName.equals(connector))
- throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connector + ")");
- } else {
- connectorConfig.put(ConnectorConfig.NAME_CONFIG, connector);
- }
+ checkAndPutConnectorConfigName(connector, connectorConfig);
herder.putConnectorConfig(connector, connectorConfig, true, cb);
Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
@@ -225,6 +218,18 @@ public class ConnectorsResource {
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
}
+ // Check whether the connector name from the url matches the one (if there is one) provided in the connectorconfig
+ // object. Throw BadRequestException on mismatch, otherwise put connectorname in config
+ private void checkAndPutConnectorConfigName(String connectorName, Map<String, String> connectorConfig) {
+ String includedName = connectorConfig.get(ConnectorConfig.NAME_CONFIG);
+ if (includedName != null) {
+ if (!includedName.equals(connectorName))
+ throw new BadRequestException("Connector name configuration (" + includedName + ") doesn't match connector name in the URL (" + connectorName + ")");
+ } else {
+ connectorConfig.put(ConnectorConfig.NAME_CONFIG, connectorName);
+ }
+ }
+
// Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the
// request to the leader.
private <T, U> T completeOrForwardRequest(FutureCallback<T> cb,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5a2960f8/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index cb86143..89a2218 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -365,6 +365,14 @@ public class ConnectorsResourceTest {
connectorsResource.putConnectorConfig(CONNECTOR_NAME, FORWARD, connConfig);
}
+ @Test(expected = BadRequestException.class)
+ public void testCreateConnectorConfigNameMismatch() throws Throwable {
+ Map<String, String> connConfig = new HashMap<>();
+ connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
+ CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
+ connectorsResource.createConnector(FORWARD, request);
+ }
+
@Test
public void testGetConnectorTaskConfigs() throws Throwable {
final Capture<Callback<List<TaskInfo>>> cb = Capture.newInstance();