You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/17 21:26:17 UTC
[1/2] kafka git commit: KAFKA-3315: Add REST and Connector API to
expose connector configuration
Repository: kafka
Updated Branches:
refs/heads/trunk a1eb12d7c -> c07d01722
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
new file mode 100644
index 0000000..6040563
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class ConfigInfo {
+
+ private ConfigKeyInfo configKey;
+ private ConfigValueInfo configValue;
+
+ @JsonCreator
+ public ConfigInfo(
+ @JsonProperty("definition") ConfigKeyInfo configKey,
+ @JsonProperty("value") ConfigValueInfo configValue) {
+ this.configKey = configKey;
+ this.configValue = configValue;
+ }
+
+ @JsonProperty("definition")
+ public ConfigKeyInfo configKey() {
+ return configKey;
+ }
+
+ @JsonProperty("value")
+ public ConfigValueInfo configValue() {
+ return configValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigInfo that = (ConfigInfo) o;
+ return Objects.equals(configKey, that.configKey) &&
+ Objects.equals(configValue, that.configValue);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(configKey, configValue);
+ }
+
+ @Override
+ public String toString() {
+ return "[" + configKey.toString() + "," + configValue.toString() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
new file mode 100644
index 0000000..3e73983
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigInfos {
+
+ @JsonProperty("name")
+ private final String name;
+
+ @JsonProperty("error_count")
+ private final int errorCount;
+
+ @JsonProperty("groups")
+ private final List<String> groups;
+
+ @JsonProperty("configs")
+ private final List<ConfigInfo> configs;
+
+ @JsonCreator
+ public ConfigInfos(@JsonProperty("name") String name,
+ @JsonProperty("error_count") int errorCount,
+ @JsonProperty("groups") List<String> groups,
+ @JsonProperty("configs") List<ConfigInfo> configs) {
+ this.name = name;
+ this.groups = groups;
+ this.errorCount = errorCount;
+ this.configs = configs;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public List<String> groups() {
+ return groups;
+ }
+
+ @JsonProperty("error_count")
+ public int errorCount() {
+ return errorCount;
+ }
+
+ @JsonProperty("configs")
+ public List<ConfigInfo> values() {
+ return configs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigInfos that = (ConfigInfos) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(errorCount, that.errorCount) &&
+ Objects.equals(groups, that.groups) &&
+ Objects.equals(configs, that.configs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, errorCount, groups, configs);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[")
+ .append(name)
+ .append(",")
+ .append(errorCount)
+ .append(",")
+ .append(groups)
+ .append(",")
+ .append(configs)
+ .append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
new file mode 100644
index 0000000..f813709
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigKeyInfo {
+
+ private final String name;
+ private final String type;
+ private final boolean required;
+ private final Object defaultValue;
+ private final String importance;
+ private final String documentation;
+ private final String group;
+ private final int orderInGroup;
+ private final String width;
+ private final String displayName;
+ private final List<String> dependents;
+
+ @JsonCreator
+ public ConfigKeyInfo(@JsonProperty("name") String name,
+ @JsonProperty("type") String type,
+ @JsonProperty("required") boolean required,
+ @JsonProperty("default_value") Object defaultValue,
+ @JsonProperty("importance") String importance,
+ @JsonProperty("documentation") String documentation,
+ @JsonProperty("group") String group,
+ @JsonProperty("order_in_group") int orderInGroup,
+ @JsonProperty("width") String width,
+ @JsonProperty("display_name") String displayName,
+ @JsonProperty("dependents") List<String> dependents) {
+ this.name = name;
+ this.type = type;
+ this.required = required;
+ this.defaultValue = defaultValue;
+ this.importance = importance;
+ this.documentation = documentation;
+ this.group = group;
+ this.orderInGroup = orderInGroup;
+ this.width = width;
+ this.displayName = displayName;
+ this.dependents = dependents;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public String type() {
+ return type;
+ }
+
+ @JsonProperty
+ public boolean required() {
+ return required;
+ }
+
+ @JsonProperty("default_value")
+ public Object defaultValue() {
+ return defaultValue;
+ }
+
+ @JsonProperty
+ public String documentation() {
+ return documentation;
+ }
+
+ @JsonProperty
+ public String group() {
+ return group;
+ }
+
+ @JsonProperty("order")
+ public int orderInGroup() {
+ return orderInGroup;
+ }
+
+ @JsonProperty
+ public String width() {
+ return width;
+ }
+
+ @JsonProperty
+ public String importance() {
+ return importance;
+ }
+
+ @JsonProperty("display_name")
+ public String displayName() {
+ return displayName;
+ }
+
+ @JsonProperty
+ public List<String> dependents() {
+ return dependents;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigKeyInfo that = (ConfigKeyInfo) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(type, that.type) &&
+ Objects.equals(required, that.required) &&
+ Objects.equals(defaultValue, that.defaultValue) &&
+ Objects.equals(importance, that.importance) &&
+ Objects.equals(documentation, that.documentation) &&
+ Objects.equals(group, that.group) &&
+ Objects.equals(orderInGroup, that.orderInGroup) &&
+ Objects.equals(width, that.width) &&
+ Objects.equals(displayName, that.displayName) &&
+ Objects.equals(dependents, that.dependents);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[")
+ .append(name)
+ .append(",")
+ .append(type)
+ .append(",")
+ .append(required)
+ .append(",")
+ .append(defaultValue)
+ .append(",")
+ .append(importance)
+ .append(",")
+ .append(documentation)
+ .append(",")
+ .append(group)
+ .append(",")
+ .append(orderInGroup)
+ .append(",")
+ .append(width)
+ .append(",")
+ .append(displayName)
+ .append(",")
+ .append(dependents)
+ .append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
new file mode 100644
index 0000000..51e7ee5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigValueInfo {
+ private String name;
+ private Object value;
+ private List<Object> recommendedValues;
+ private List<String> errors;
+ private boolean visible;
+
+ @JsonCreator
+ public ConfigValueInfo(
+ @JsonProperty("name") String name,
+ @JsonProperty("value") Object value,
+ @JsonProperty("recommended_values") List<Object> recommendedValues,
+ @JsonProperty("errors") List<String> errors,
+ @JsonProperty("visible") boolean visible) {
+ this.name = name;
+ this.value = value;
+ this.recommendedValues = recommendedValues;
+ this.errors = errors;
+ this.visible = visible;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public Object value() {
+ return value;
+ }
+
+ @JsonProperty("recommended_values")
+ public List<Object> recommendedValues() {
+ return recommendedValues;
+ }
+
+ @JsonProperty
+ public List<String> errors() {
+ return errors;
+ }
+
+ @JsonProperty
+ public boolean visible() {
+ return visible;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigValueInfo that = (ConfigValueInfo) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(value, that.value) &&
+ Objects.equals(recommendedValues, that.recommendedValues) &&
+ Objects.equals(errors, that.errors) &&
+ Objects.equals(visible, that.visible);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, value, recommendedValues, errors, visible);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[")
+ .append(name)
+ .append(",")
+ .append(value)
+ .append(",")
+ .append(recommendedValues)
+ .append(",")
+ .append(errors)
+ .append(",")
+ .append(visible)
+ .append("]");
+ return sb.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 8daae05..9567ef9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+
import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.ArrayList;
@@ -34,13 +35,15 @@ public class ConnectorInfo {
private final List<ConnectorTaskId> tasks;
@JsonCreator
- public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config,
+ public ConnectorInfo(@JsonProperty("name") String name,
+ @JsonProperty("config") Map<String, String> config,
@JsonProperty("tasks") List<ConnectorTaskId> tasks) {
this.name = name;
this.config = config;
this.tasks = tasks;
}
+
@JsonProperty
public String name() {
return name;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
new file mode 100644
index 0000000..8439707
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/connector-plugins")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorPluginsResource {
+
+ private final Herder herder;
+
+ public ConnectorPluginsResource(Herder herder) {
+ this.herder = herder;
+ }
+
+ @PUT
+ @Path("/{connectorType}/config/validate")
+ public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType,
+ final Map<String, String> connectorConfig) throws Throwable {
+ return herder.validateConfigs(connType, connectorConfig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index d0d940b..b6e9f61 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -32,6 +33,14 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
@@ -43,13 +52,6 @@ import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
@Path("/connectors")
@Produces(MediaType.APPLICATION_JSON)
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 707470f..9c48ed7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -49,7 +49,6 @@ import java.util.Set;
public class StandaloneHerder extends AbstractHerder {
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
- private final Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneHerder(Worker worker) {
@@ -60,8 +59,7 @@ public class StandaloneHerder extends AbstractHerder {
StandaloneHerder(String workerId,
Worker worker,
StatusBackingStore statusBackingStore) {
- super(statusBackingStore, workerId);
- this.worker = worker;
+ super(worker, statusBackingStore, workerId);
}
public synchronized void start() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
index 0ab64fd..c2515a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.tools;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSinkConnector extends SourceConnector {
@Override
public void stop() {
}
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
index 5f9afd5..b18db6e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.tools;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSourceConnector extends SourceConnector {
@Override
public void stop() {
}
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f17023c..1dc5784 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -34,6 +34,7 @@ public class AbstractHerderTest extends EasyMockSupport {
@Test
public void connectorStatus() {
+ Worker worker = null;
String workerId = "workerId";
String connector = "connector";
int generation = 5;
@@ -42,8 +43,8 @@ public class AbstractHerderTest extends EasyMockSupport {
StatusBackingStore store = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(StatusBackingStore.class, String.class)
- .withArgs(store, workerId)
+ .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+ .withArgs(worker, store, workerId)
.addMockedMethod("generation")
.createMock();
@@ -76,14 +77,15 @@ public class AbstractHerderTest extends EasyMockSupport {
@Test
public void taskStatus() {
+ Worker worker = null;
ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
String workerId = "workerId";
StatusBackingStore store = strictMock(StatusBackingStore.class);
AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(StatusBackingStore.class, String.class)
- .withArgs(store, workerId)
+ .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+ .withArgs(worker, store, workerId)
.addMockedMethod("generation")
.createMock();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 67d3fdc..557d789 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
@@ -466,7 +467,11 @@ public class WorkerTest extends ThreadedTest {
/* Name here needs to be unique as we are testing the aliasing mechanism */
- private static class WorkerTestConnector extends Connector {
+ public static class WorkerTestConnector extends Connector {
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");
+
@Override
public String version() {
return "1.0";
@@ -491,6 +496,11 @@ public class WorkerTest extends ThreadedTest {
public void stop() {
}
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
}
private static class TestSourceTask extends SourceTask {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
new file mode 100644
index 0000000..625c91f
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorPluginsResourceTest {
+
+ private static Map<String, String> props = new HashMap<>();
+ static {
+ props.put("test.string.config", "testString");
+ props.put("test.int.config", "10");
+ }
+
+ private static final ConfigInfos CONFIG_INFOS;
+ static {
+ List<ConfigInfo> configs = new LinkedList<>();
+
+ ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>());
+ ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+ configs.add(configInfo);
+
+ configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>());
+ configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+ configs.add(configInfo);
+
+ configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>());
+ configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+ configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+ configs.add(configInfo);
+
+ CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs);
+ }
+
+ @Mock
+ private Herder herder;
+ private ConnectorPluginsResource connectorPluginsResource;
+
+ @Before
+ public void setUp() throws NoSuchMethodException {
+ PowerMock.mockStatic(RestServer.class,
+ RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+ connectorPluginsResource = new ConnectorPluginsResource(herder);
+ }
+
+ @Test
+ public void testValidateConfig() throws Throwable {
+ herder.validateConfigs(EasyMock.eq(ConnectorPluginsResourceTestConnector.class.getName()), EasyMock.eq(props));
+
+ PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+ @Override
+ public ConfigInfos answer() {
+ Config config = new ConnectorPluginsResourceTestConnector().validate(props);
+ Connector connector = new ConnectorPluginsResourceTestConnector();
+ ConfigDef configDef = connector.config();
+ return AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), configDef.configKeys(), config.configValues(), configDef.groups());
+ }
+ });
+ PowerMock.replayAll();
+
+ ConfigInfos configInfos = connectorPluginsResource.validateConfigs(ConnectorPluginsResourceTestConnector.class.getName(), props);
+ assertEquals(CONFIG_INFOS.name(), configInfos.name());
+ assertEquals(CONFIG_INFOS.errorCount(), configInfos.errorCount());
+ assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
+ assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
+
+ PowerMock.verifyAll();
+ }
+
+ /* Name here needs to be unique as we are testing the aliasing mechanism */
+ public static class ConnectorPluginsResourceTestConnector extends Connector {
+
+ public static final String TEST_STRING_CONFIG = "test.string.config";
+ public static final String TEST_INT_CONFIG = "test.int.config";
+ public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.")
+ .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.")
+ .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.");
+
+ @Override
+ public String version() {
+ return "1.0";
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 4659ae8..970f56c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
@@ -83,7 +84,6 @@ public class ConnectorsResourceTest {
TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
}
-
@Mock
private Herder herder;
private ConnectorsResource connectorsResource;
@@ -172,6 +172,8 @@ public class ConnectorsResourceTest {
connectorsResource.createConnector(body);
PowerMock.verifyAll();
+
+
}
@Test(expected = AlreadyExistsException.class)
[2/2] kafka git commit: KAFKA-3315: Add REST and Connector API to
expose connector configuration
Posted by ew...@apache.org.
KAFKA-3315: Add REST and Connector API to expose connector configuration
Author: Liquan Pei <li...@gmail.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #964 from Ishiihara/expose-connector-config
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c07d0172
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c07d0172
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c07d0172
Branch: refs/heads/trunk
Commit: c07d017227f319250e5c373f8a6f504874ecfbf2
Parents: a1eb12d
Author: Liquan Pei <li...@gmail.com>
Authored: Thu Mar 17 13:26:02 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 17 13:26:02 2016 -0700
----------------------------------------------------------------------
build.gradle | 1 +
.../kafka/common/config/AbstractConfig.java | 26 +-
.../org/apache/kafka/common/config/Config.java | 28 +
.../apache/kafka/common/config/ConfigDef.java | 652 +++++++++++++++++--
.../apache/kafka/common/config/ConfigValue.java | 113 ++++
.../kafka/common/config/ConfigDefTest.java | 202 +++++-
.../kafka/connect/connector/Connector.java | 21 +
.../connector/ConnectorReconfigurationTest.java | 9 +-
.../connect/file/FileStreamSinkConnector.java | 11 +
.../connect/file/FileStreamSourceConnector.java | 12 +
.../kafka/connect/runtime/AbstractHerder.java | 109 +++-
.../kafka/connect/runtime/ConnectorConfig.java | 18 +-
.../apache/kafka/connect/runtime/Herder.java | 7 +
.../apache/kafka/connect/runtime/Worker.java | 5 +
.../runtime/distributed/DistributedHerder.java | 6 +-
.../kafka/connect/runtime/rest/RestServer.java | 8 +-
.../runtime/rest/entities/ConfigInfo.java | 66 ++
.../runtime/rest/entities/ConfigInfos.java | 102 +++
.../runtime/rest/entities/ConfigKeyInfo.java | 171 +++++
.../runtime/rest/entities/ConfigValueInfo.java | 106 +++
.../runtime/rest/entities/ConnectorInfo.java | 5 +-
.../resources/ConnectorPluginsResource.java | 49 ++
.../rest/resources/ConnectorsResource.java | 16 +-
.../runtime/standalone/StandaloneHerder.java | 4 +-
.../connect/tools/VerifiableSinkConnector.java | 6 +
.../tools/VerifiableSourceConnector.java | 6 +
.../connect/runtime/AbstractHerderTest.java | 10 +-
.../kafka/connect/runtime/WorkerTest.java | 12 +-
.../resources/ConnectorPluginsResourceTest.java | 165 +++++
.../rest/resources/ConnectorsResourceTest.java | 4 +-
30 files changed, 1836 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 321fc3f..4b84ba5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -543,6 +543,7 @@ project(':clients') {
include "**/org/apache/kafka/common/*"
include "**/org/apache/kafka/common/errors/*"
include "**/org/apache/kafka/common/serialization/*"
+ include "**/org/apache/kafka/common/config/*"
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index b44f72c..f833d7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -12,6 +12,13 @@
*/
package org.apache.kafka.common.config;
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -20,13 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A convenient base class for configurations to extend.
* <p>
@@ -46,7 +46,7 @@ public class AbstractConfig {
private final Map<String, Object> values;
@SuppressWarnings("unchecked")
- public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Boolean doLog) {
+ public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
/* check that all the keys are really strings */
for (Object key : originals.keySet())
if (!(key instanceof String))
@@ -62,6 +62,12 @@ public class AbstractConfig {
this(definition, originals, true);
}
+ public AbstractConfig(Map<String, Object> parsedConfig) {
+ this.values = parsedConfig;
+ this.originals = new HashMap<>();
+ this.used = Collections.synchronizedSet(new HashSet<String>());
+ }
+
protected Object get(String key) {
if (!values.containsKey(key))
throw new ConfigException(String.format("Unknown configuration '%s'", key));
@@ -94,7 +100,7 @@ public class AbstractConfig {
return (List<String>) get(key);
}
- public boolean getBoolean(String key) {
+ public Boolean getBoolean(String key) {
return (Boolean) get(key);
}
@@ -125,7 +131,7 @@ public class AbstractConfig {
/**
* Get all the original settings, ensuring that all values are of type String.
* @return the original settings
- * @throw ClassCastException if any of the values are not strings
+ * @throws ClassCastException if any of the values are not strings
*/
public Map<String, String> originalsStrings() {
Map<String, String> copy = new RecordingMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/Config.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/Config.java b/clients/src/main/java/org/apache/kafka/common/config/Config.java
new file mode 100644
index 0000000..ce5ee17
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/Config.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.List;
+
+public class Config {
+ private List<ConfigValue> configValues;
+
+ public Config(List<ConfigValue> configValues) {
+ this.configValues = configValues;
+ }
+
+ public List<ConfigValue> configValues() {
+ return configValues;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 703eb7c..881cb0b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -12,45 +12,66 @@
*/
package org.apache.kafka.common.config;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.utils.Utils;
-
/**
- * This class is used for specifying the set of expected configurations, their type, their defaults, their
- * documentation, and any special validation logic used for checking the correctness of the values the user provides.
+ * This class is used for specifying the set of expected configurations. For each configuration, you can specify
+ * the name, the type, the default value, the documentation, the group information, the order in the group,
+ * the width of the configuration value and the name suitable for display in the UI.
+ *
+ * You can provide special validation logic used for single configuration validation by overriding {@link Validator}.
+ *
+ * Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration
+ * may change according to the values of other configurations. You can override {@link Recommender} to get valid
+ * values and set visibility of a configuration given the current configuration values.
+ *
* <p/>
- * Usage of this class looks something like this:
+ * To use the class:
* <p/>
* <pre>
* ConfigDef defs = new ConfigDef();
- * defs.define("config_name", Type.STRING, "default string value", "This configuration is used for blah blah blah.");
- * defs.define("another_config_name", Type.INT, 42, Range.atLeast(0), "More documentation on this config");
*
- * Properties props = new Properties();
- * props.setProperty("config_name", "some value");
+ * defs.define("config_with_default", Type.STRING, "default string value", "Configuration with default value.");
+ * defs.define("config_with_validator", Type.INT, 42, Range.atLeast(0), "Configuration with user provided validator.");
+ * defs.define("config_with_dependents", Type.INT, "Configuration with dependents.", "group", 1, "Config With Dependents", Arrays.asList("config_with_default;","config_with_validator"));
+ *
+ * Map<String, String> props = new HashMap<>();
+ * props.put("config_with_default", "some value");
+ * props.put("config_with_dependents", "some other value");
+ * // will return "some value"
* Map<String, Object> configs = defs.parse(props);
+ * String someConfig = (String) configs.get("config_with_default");
+ * // will return default value of 42
+ * int anotherConfig = (Integer) configs.get("config_with_validator");
*
- * String someConfig = (String) configs.get("config_name"); // will return "some value"
- * int anotherConfig = (Integer) configs.get("another_config_name"); // will return default value of 42
+ * To validate the full configuration, use:
+ * List<Config> configs = def.validate(props);
+ * The {@link Config} contains updated configuration information given the current configuration values.
* </pre>
* <p/>
- * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
+ * This class can be used standalone or in combination with {@link AbstractConfig} which provides some additional
* functionality for accessing configs.
*/
public class ConfigDef {
public static final Object NO_DEFAULT_VALUE = new String("");
- private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
+ private final Map<String, ConfigKey> configKeys = new HashMap<>();
+ private final List<String> groups = new LinkedList<>();
+ private Set<String> configsWithNoParent;
/**
* Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
@@ -63,26 +84,256 @@ public class ConfigDef {
/**
* Define a new configuration
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param validator the validator to use in checking the correctness of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @param recommender the recommender provides valid values given the parent configuration values
* @return This ConfigDef so you can chain calls
*/
- public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
- if (configKeys.containsKey(name))
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
+ if (configKeys.containsKey(name)) {
throw new ConfigException("Configuration " + name + " is defined twice.");
+ }
+ if (group != null && !groups.contains(group)) {
+ groups.add(group);
+ }
Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
- configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
+ configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
return this;
}
/**
+ * Define a new configuration with no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param validator the validator to use in checking the correctness of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, List<String> dependents) {
+ return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+ }
+
+ /**
+ * Define a new configuration with no dependents
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param validator the validator to use in checking the correctness of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param recommender the recommender provides valid values given the parent configuration values
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
+ return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ }
+
+ /**
+ * Define a new configuration with no dependents and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param validator the validator to use in checking the correctness of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName) {
+ return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+ }
+
+ /**
+ * Define a new configuration with no special validation logic
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @param recommender the recommender provides valid values given the parent configuration values
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
+ return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender);
+ }
+
+ /**
+ * Define a new configuration with no special validation logic and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, List<String> dependents) {
+ return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+ }
+
+ /**
+ * Define a new configuration with no special validation logic and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param recommender the recommender provides valid values given the parent configuration values
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
+ return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ }
+
+ /**
+ * Define a new configuration with no special validation logic, not dependents and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+ String group, int orderInGroup, Width width, String displayName) {
+ return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+ }
+
+ /**
+ * Define a new configuration with no default value and no special validation logic
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @param recommender the recommender provides valid values given the parent configuration value
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+ Width width, String displayName, List<String> dependents, Recommender recommender) {
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender);
+ }
+
+ /**
+ * Define a new configuration with no default value, no special validation logic and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param dependents the configurations that are dependents of this configuration
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+ Width width, String displayName, List<String> dependents) {
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+ }
+
+ /**
+ * Define a new configuration with no default value, no special validation logic and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @param recommender the recommender provides valid values given the parent configuration value
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+ Width width, String displayName, Recommender recommender) {
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+ }
+
+ /**
+ * Define a new configuration with no default value, no special validation logic, no dependents and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @param group the group this config belongs to
+ * @param orderInGroup the order of this config in the group
+ * @param width the width of the config
+ * @param displayName the name suitable for display
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+ Width width, String displayName) {
+ return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+ }
+
+ /**
+ * Define a new configuration with no group, no order in group, no width, no display name, no dependents and no custom recommender
+ * @param name the name of the config parameter
+ * @param type the type of the config
+ * @param defaultValue the default value to use if this config isn't present
+ * @param validator the validator to use in checking the correctness of the config
+ * @param importance the importance of this config
+ * @param documentation the documentation string for the config
+ * @return This ConfigDef so you can chain calls
+ */
+ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+ return define(name, type, defaultValue, validator, importance, documentation, null, -1, Width.NONE, name);
+ }
+
+ /**
* Define a new configuration with no special validation logic
- *
* @param name The name of the config parameter
* @param type The type of the config
* @param defaultValue The default value to use if this config isn't present
@@ -96,7 +347,6 @@ public class ConfigDef {
/**
* Define a new configuration with no default value and no special validation logic
- *
* @param name The name of the config parameter
* @param type The type of the config
* @param importance The importance of this config: is this something you will likely need to change.
@@ -108,6 +358,22 @@ public class ConfigDef {
}
/**
+ * Get the configuration keys
+ * @return a map containing all configuration keys
+ */
+ public Map<String, ConfigKey> configKeys() {
+ return configKeys;
+ }
+
+ /**
+ * Get the groups for the configuration
+ * @return a list of group names
+ */
+ public List<String> groups() {
+ return groups;
+ }
+
+ /**
* Add standard SSL client configuration options.
* @return this
*/
@@ -131,34 +397,188 @@ public class ConfigDef {
* appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
* programmatically constructed map.
*
- * @param props The configs to parse and validate
+ * @param props The configs to parse and validate.
* @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
- * the appropriate type (int, string, etc)
+ * the appropriate type (int, string, etc).
*/
public Map<String, Object> parse(Map<?, ?> props) {
- /* parse all known keys */
- Map<String, Object> values = new HashMap<String, Object>();
+ // Check all configurations are defined
+ List<String> undefinedConfigKeys = undefinedDependentConfigs();
+ if (!undefinedConfigKeys.isEmpty()) {
+ String joined = Utils.join(undefinedConfigKeys, ",");
+ throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined);
+ }
+ // parse all known keys
+ Map<String, Object> values = new HashMap<>();
for (ConfigKey key : configKeys.values()) {
Object value;
// props map contains setting - assign ConfigKey value
- if (props.containsKey(key.name))
+ if (props.containsKey(key.name)) {
value = parseType(key.name, props.get(key.name), key.type);
- // props map doesn't contain setting, the key is required because no default value specified - its an error
- else if (key.defaultValue == NO_DEFAULT_VALUE)
+ // props map doesn't contain setting, the key is required because no default value specified - its an error
+ } else if (key.defaultValue == NO_DEFAULT_VALUE) {
throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
- // otherwise assign setting its default value
- else
+ } else {
+ // otherwise assign setting its default value
value = key.defaultValue;
- if (key.validator != null)
+ }
+ if (key.validator != null) {
key.validator.ensureValid(key.name, value);
+ }
values.put(key.name, value);
}
return values;
}
/**
+ * Validate the current configuration values with the configuration definition.
+ * @param props the current configuration values
+ * @return List of Config, each Config contains the updated configuration information given
+ * the current configuration values.
+ */
+ public List<ConfigValue> validate(Map<String, String> props) {
+ Map<String, ConfigValue> configValues = new HashMap<>();
+ for (String name: configKeys.keySet()) {
+ configValues.put(name, new ConfigValue(name));
+ }
+
+ List<String> undefinedConfigKeys = undefinedDependentConfigs();
+ for (String undefinedConfigKey: undefinedConfigKeys) {
+ ConfigValue undefinedConfigValue = new ConfigValue(undefinedConfigKey);
+ undefinedConfigValue.addErrorMessage(undefinedConfigKey + " is referred in the dependents, but not defined.");
+ undefinedConfigValue.visible(false);
+ configValues.put(undefinedConfigKey, undefinedConfigValue);
+ }
+
+ Map<String, Object> parsed = parseForValidate(props, configValues);
+ return validate(parsed, configValues);
+ }
+
+ // package accessible for testing
+ Map<String, Object> parseForValidate(Map<String, String> props, Map<String, ConfigValue> configValues) {
+ Map<String, Object> parsed = new HashMap<>();
+ Set<String> configsWithNoParent = getConfigsWithNoParent();
+ for (String name: configsWithNoParent) {
+ parseForValidate(name, props, parsed, configValues);
+ }
+ return parsed;
+ }
+
+
+ private List<ConfigValue> validate(Map<String, Object> parsed, Map<String, ConfigValue> configValues) {
+ Set<String> configsWithNoParent = getConfigsWithNoParent();
+ for (String name: configsWithNoParent) {
+ validate(name, parsed, configValues);
+ }
+ return new LinkedList<>(configValues.values());
+ }
+
+ private List<String> undefinedDependentConfigs() {
+ Set<String> undefinedConfigKeys = new HashSet<>();
+ for (String configName: configKeys.keySet()) {
+ ConfigKey configKey = configKeys.get(configName);
+ List<String> dependents = configKey.dependents;
+ for (String dependent: dependents) {
+ if (!configKeys.containsKey(dependent)) {
+ undefinedConfigKeys.add(dependent);
+ }
+ }
+ }
+ return new LinkedList<>(undefinedConfigKeys);
+ }
+
+ private Set<String> getConfigsWithNoParent() {
+ if (this.configsWithNoParent != null) {
+ return this.configsWithNoParent;
+ }
+ Set<String> configsWithParent = new HashSet<>();
+
+ for (ConfigKey configKey: configKeys.values()) {
+ List<String> dependents = configKey.dependents;
+ configsWithParent.addAll(dependents);
+ }
+
+ Set<String> configs = new HashSet<>(configKeys.keySet());
+ configs.removeAll(configsWithParent);
+ this.configsWithNoParent = configs;
+ return configs;
+ }
+
+ private void parseForValidate(String name, Map<String, String> props, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
+ if (!configKeys.containsKey(name)) {
+ return;
+ }
+ ConfigKey key = configKeys.get(name);
+ ConfigValue config = configs.get(name);
+
+ Object value = null;
+ if (props.containsKey(key.name)) {
+ try {
+ value = parseType(key.name, props.get(key.name), key.type);
+ } catch (ConfigException e) {
+ config.addErrorMessage(e.getMessage());
+ }
+ } else if (key.defaultValue == NO_DEFAULT_VALUE) {
+ config.addErrorMessage("Missing required configuration \"" + key.name + "\" which has no default value.");
+ } else {
+ value = key.defaultValue;
+ }
+
+ if (key.validator != null) {
+ try {
+ key.validator.ensureValid(key.name, value);
+ } catch (ConfigException e) {
+ config.addErrorMessage(e.getMessage());
+ }
+ }
+ config.value(value);
+ parsed.put(name, value);
+ for (String dependent: key.dependents) {
+ parseForValidate(dependent, props, parsed, configs);
+ }
+ }
+
+ private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
+ if (!configKeys.containsKey(name)) {
+ return;
+ }
+ ConfigKey key = configKeys.get(name);
+ ConfigValue config = configs.get(name);
+ Object value = parsed.get(name);
+ List<Object> recommendedValues;
+ if (key.recommender != null) {
+ try {
+ recommendedValues = key.recommender.validValues(name, parsed);
+ List<Object> originalRecommendedValues = config.recommendedValues();
+
+ if (!originalRecommendedValues.isEmpty()) {
+ Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
+ Iterator<Object> it = recommendedValues.iterator();
+ while (it.hasNext()) {
+ Object o = it.next();
+ if (!originalRecommendedValueSet.contains(o)) {
+ it.remove();
+ }
+ }
+ }
+ config.recommendedValues(recommendedValues);
+ if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) {
+ config.addErrorMessage("Invalid value for configuration " + key.name);
+ }
+ config.visible(key.recommender.visible(name, parsed));
+ } catch (ConfigException e) {
+ config.addErrorMessage(e.getMessage());
+ }
+ }
+
+ configs.put(name, config);
+ for (String dependent: key.dependents) {
+ validate(dependent, parsed, configs);
+ }
+ }
+
+ /**
* Parse a value according to its expected type.
- *
* @param name The config name
* @param value The config value
* @param type The expected type
@@ -263,15 +683,56 @@ public class ConfigDef {
BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
}
+ /**
+ * The importance level for a configuration
+ */
public enum Importance {
HIGH, MEDIUM, LOW
}
/**
- * Validation logic the user may provide
+ * The width of a configuration value
+ */
+ public enum Width {
+ NONE, SHORT, MEDIUM, LONG
+ }
+
+ /**
+ * This is used by the {@link #validate(Map)} to get valid values for a configuration given the current
+ * configuration values in order to perform full configuration validation and visibility modification.
+ * In case that there are dependencies between configurations, the valid values and visibility
+ * for a configuration may change given the values of other configurations.
+ */
+ public interface Recommender {
+
+ /**
+ * The valid values for the configuration given the current configuration values.
+ * @param name The name of the configuration
+ * @param parsedConfig The parsed configuration values
+ * @return The list of valid values. To function properly, the returned objects should have the type
+ * defined for the configuration using the recommender.
+ */
+ List<Object> validValues(String name, Map<String, Object> parsedConfig);
+
+ /**
+ * Set the visibility of the configuration given the current configuration values.
+ * @param name The name of the configuration
+ * @param parsedConfig The parsed configuration values
+ * @return The visibility of the configuration
+ */
+ boolean visible(String name, Map<String, Object> parsedConfig);
+ }
+
+ /**
+ * Validation logic the user may provide to perform single configuration validation.
*/
public interface Validator {
- public void ensureValid(String name, Object o);
+ /**
+ * Perform single configuration validation.
+ * @param name The name of the configuration
+ * @param value The value of the configuration
+ */
+ void ensureValid(String name, Object value);
}
/**
@@ -345,16 +806,24 @@ public class ConfigDef {
}
}
- private static class ConfigKey {
+ public static class ConfigKey {
public final String name;
public final Type type;
public final String documentation;
public final Object defaultValue;
public final Validator validator;
public final Importance importance;
-
- public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
- super();
+ public final String group;
+ public final int orderInGroup;
+ public final Width width;
+ public final String displayName;
+ public final List<String> dependents;
+ public final Recommender recommender;
+
+ public ConfigKey(String name, Type type, Object defaultValue, Validator validator,
+ Importance importance, String documentation, String group,
+ int orderInGroup, Width width, String displayName,
+ List<String> dependents, Recommender recommender) {
this.name = name;
this.type = type;
this.defaultValue = defaultValue;
@@ -363,34 +832,21 @@ public class ConfigDef {
if (this.validator != null && this.hasDefault())
this.validator.ensureValid(name, defaultValue);
this.documentation = documentation;
+ this.dependents = dependents;
+ this.group = group;
+ this.orderInGroup = orderInGroup;
+ this.width = width;
+ this.displayName = displayName;
+ this.recommender = recommender;
}
public boolean hasDefault() {
return this.defaultValue != NO_DEFAULT_VALUE;
}
-
}
public String toHtmlTable() {
- // sort first required fields, then by importance, then name
- List<ConfigDef.ConfigKey> configs = new ArrayList<ConfigDef.ConfigKey>(this.configKeys.values());
- Collections.sort(configs, new Comparator<ConfigDef.ConfigKey>() {
- public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
- // first take anything with no default value (therefore required)
- if (!k1.hasDefault() && k2.hasDefault())
- return -1;
- else if (!k2.hasDefault() && k1.hasDefault())
- return 1;
-
- // then sort by importance
- int cmp = k1.importance.compareTo(k2.importance);
- if (cmp == 0)
- // then sort in alphabetical order
- return k1.name.compareTo(k2.name);
- else
- return cmp;
- }
- });
+ List<ConfigKey> configs = sortedConfigs();
StringBuilder b = new StringBuilder();
b.append("<table class=\"data-table\"><tbody>\n");
b.append("<tr>\n");
@@ -434,4 +890,74 @@ public class ConfigDef {
b.append("</tbody></table>");
return b.toString();
}
+
+ /**
+ * Get the configs formatted with reStructuredText, suitable for embedding in Sphinx
+ * documentation.
+ */
+ public String toRst() {
+ List<ConfigKey> configs = sortedConfigs();
+ StringBuilder b = new StringBuilder();
+
+ for (ConfigKey def : configs) {
+ b.append("``");
+ b.append(def.name);
+ b.append("``\n");
+ for (String docLine : def.documentation.split("\n")) {
+ if (docLine.length() == 0) {
+ continue;
+ }
+ b.append(" ");
+ b.append(docLine);
+ b.append("\n\n");
+ }
+ b.append(" * Type: ");
+ b.append(def.type.toString().toLowerCase());
+ b.append("\n");
+ if (def.defaultValue != null) {
+ b.append(" * Default: ");
+ if (def.type == Type.STRING) {
+ b.append("\"");
+ b.append(def.defaultValue);
+ b.append("\"");
+ } else {
+ b.append(def.defaultValue);
+ }
+ b.append("\n");
+ }
+ b.append(" * Importance: ");
+ b.append(def.importance.toString().toLowerCase());
+ b.append("\n\n");
+ }
+ return b.toString();
+ }
+
+ /**
+ * Get a list of configs sorted into "natural" order: listing required fields first, then
+ * ordering by importance, and finally by name.
+ */
+ private List<ConfigKey> sortedConfigs() {
+ // sort first required fields, then by importance, then name
+ List<ConfigKey> configs = new ArrayList<>(this.configKeys.values());
+ Collections.sort(configs, new Comparator<ConfigKey>() {
+ public int compare(ConfigKey k1, ConfigKey k2) {
+ // first take anything with no default value
+ if (!k1.hasDefault() && k2.hasDefault()) {
+ return -1;
+ } else if (!k2.hasDefault() && k1.hasDefault()) {
+ return 1;
+ }
+
+ // then sort by importance
+ int cmp = k1.importance.compareTo(k2.importance);
+ if (cmp == 0) {
+ // then sort in alphabetical order
+ return k1.name.compareTo(k2.name);
+ } else {
+ return cmp;
+ }
+ }
+ });
+ return configs;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
new file mode 100644
index 0000000..c9a4a34
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigValue {
+
+ private String name;
+ private Object value;
+ private List<Object> recommendedValues;
+ private List<String> errorMessages;
+ private boolean visible;
+
+ public ConfigValue(String name) {
+ this(name, null, new LinkedList<Object>(), new LinkedList<String>());
+ }
+
+ public ConfigValue(String name, Object value, List<Object> recommendedValues, List<String> errorMessages) {
+ this.name = name;
+ this.value = value;
+ this.recommendedValues = recommendedValues;
+ this.errorMessages = errorMessages;
+ this.visible = true;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public Object value() {
+ return value;
+ }
+
+ public List<Object> recommendedValues() {
+ return recommendedValues;
+ }
+
+ public List<String> errorMessages() {
+ return errorMessages;
+ }
+
+ public boolean visible() {
+ return visible;
+ }
+
+ public void value(Object value) {
+ this.value = value;
+ }
+
+ public void recommendedValues(List<Object> recommendedValues) {
+ this.recommendedValues = recommendedValues;
+ }
+
+ public void addErrorMessage(String errorMessage) {
+ this.errorMessages.add(errorMessage);
+ }
+
+ public void visible(boolean visible) {
+ this.visible = visible;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigValue that = (ConfigValue) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(value, that.value) &&
+ Objects.equals(recommendedValues, that.recommendedValues) &&
+ Objects.equals(errorMessages, that.errorMessages) &&
+ Objects.equals(visible, that.visible);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, value, recommendedValues, errorMessages, visible);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("[")
+ .append(name)
+ .append(",")
+ .append(value)
+ .append(",")
+ .append(recommendedValues)
+ .append(",")
+ .append(errorMessages)
+ .append(",")
+ .append(visible)
+ .append("]");
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index fa0370b..022fb6b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -12,22 +12,27 @@
*/
package org.apache.kafka.common.config;
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigDef.Range;
-import org.apache.kafka.common.config.ConfigDef.ValidString;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidString;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.types.Password;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class ConfigDefTest {
@Test
@@ -156,7 +161,8 @@ public class ConfigDefTest {
final String key = "enum_test";
ConfigDef def = new ConfigDef();
- def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
+ def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+ ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
Properties props = new Properties();
props.put(key, "ONE");
@@ -164,6 +170,180 @@ public class ConfigDefTest {
assertEquals("ONE", vals.get(key));
}
+ @Test
+ public void testGroupInference() {
+ List<String> expected1 = Arrays.asList("group1", "group2");
+ ConfigDef def1 = new ConfigDef()
+ .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a")
+ .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b")
+ .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+
+ assertEquals(expected1, def1.groups());
+
+ List<String> expected2 = Arrays.asList("group2", "group1");
+ ConfigDef def2 = new ConfigDef()
+ .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a")
+ .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b")
+ .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+
+ assertEquals(expected2, def2.groups());
+ }
+
+ @Test
+ public void testParseForValidate() {
+ Map<String, Object> expectedParsed = new HashMap<>();
+ expectedParsed.put("a", 1);
+ expectedParsed.put("b", null);
+ expectedParsed.put("c", null);
+ expectedParsed.put("d", 10);
+
+ Map<String, ConfigValue> expected = new HashMap<>();
+ String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+ String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+ ConfigValue configA = new ConfigValue("a", 1, Collections.<Object>emptyList(), Collections.<String>emptyList());
+ ConfigValue configB = new ConfigValue("b", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageB, errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageC));
+ ConfigValue configD = new ConfigValue("d", 10, Collections.<Object>emptyList(), Collections.<String>emptyList());
+ expected.put("a", configA);
+ expected.put("b", configB);
+ expected.put("c", configC);
+ expected.put("d", configD);
+
+ ConfigDef def = new ConfigDef()
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+ .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false));
+
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "1");
+ props.put("d", "10");
+
+ Map<String, ConfigValue> configValues = new HashMap<>();
+
+ for (String name: def.configKeys().keySet()) {
+ configValues.put(name, new ConfigValue(name));
+ }
+
+ Map<String, Object> parsed = def.parseForValidate(props, configValues);
+
+ assertEquals(expectedParsed, parsed);
+ assertEquals(expected, configValues);
+ }
+
+ @Test
+ public void testValidate() {
+ Map<String, ConfigValue> expected = new HashMap<>();
+ String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+ String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+ String errorMessageD = "Invalid value for configuration d";
+
+ ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+ ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+ ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD));
+
+ expected.put("a", configA);
+ expected.put("b", configB);
+ expected.put("c", configC);
+ expected.put("d", configD);
+
+ ConfigDef def = new ConfigDef()
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+ .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false));
+
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "1");
+ props.put("d", "10");
+
+ List<ConfigValue> configs = def.validate(props);
+ for (ConfigValue config : configs) {
+ String name = config.name();
+ ConfigValue expectedConfig = expected.get(name);
+ assertEquals(expectedConfig, config);
+ }
+ }
+
+ @Test
+ public void testValidateMissingConfigKey() {
+ Map<String, ConfigValue> expected = new HashMap<>();
+ String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+ String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+ String errorMessageD = "d is referred in the dependents, but not defined.";
+
+ ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+ ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB));
+ ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+ ConfigValue configD = new ConfigValue("d", null, Collections.emptyList(), Arrays.asList(errorMessageD));
+ configD.visible(false);
+
+ expected.put("a", configA);
+ expected.put("b", configB);
+ expected.put("c", configC);
+ expected.put("d", configD);
+
+ ConfigDef def = new ConfigDef()
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true));
+
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "1");
+
+ List<ConfigValue> configs = def.validate(props);
+ for (ConfigValue config: configs) {
+ String name = config.name();
+ ConfigValue expectedConfig = expected.get(name);
+ assertEquals(expectedConfig, config);
+ }
+ }
+
+ @Test
+ public void testValidateCannotParse() {
+ Map<String, ConfigValue> expected = new HashMap<>();
+ String errorMessageB = "Invalid value non_integer for configuration a: Not a number of type INT";
+ ConfigValue configA = new ConfigValue("a", null, Collections.emptyList(), Arrays.asList(errorMessageB));
+ expected.put("a", configA);
+
+ ConfigDef def = new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs");
+ Map<String, String> props = new HashMap<>();
+ props.put("a", "non_integer");
+
+ List<ConfigValue> configs = def.validate(props);
+ for (ConfigValue config: configs) {
+ String name = config.name();
+ ConfigValue expectedConfig = expected.get(name);
+ assertEquals(expectedConfig, config);
+ }
+ }
+
+ private static class IntegerRecommender implements ConfigDef.Recommender {
+
+ private boolean hasParent;
+
+ public IntegerRecommender(boolean hasParent) {
+ this.hasParent = hasParent;
+ }
+
+ @Override
+ public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+ List<Object> values = new LinkedList<>();
+ if (!hasParent) {
+ values.addAll(Arrays.asList(1, 2, 3));
+ } else {
+ values.addAll(Arrays.asList(4, 5));
+ }
+ return values;
+ }
+
+ @Override
+ public boolean visible(String name, Map<String, Object> parsedConfig) {
+ return true;
+ }
+ }
+
private void testValidators(Type type, Validator validator, Object defaultVal, Object[] okValues, Object[] badValues) {
ConfigDef def = new ConfigDef().define("name", type, defaultVal, validator, Importance.HIGH, "docs");
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 934cdbd..1370156 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -18,6 +18,9 @@
package org.apache.kafka.connect.connector;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
import java.util.List;
import java.util.Map;
@@ -121,4 +124,22 @@ public abstract class Connector {
* Stop this connector.
*/
public abstract void stop();
+
+ /**
+ * Validate the connector configuration values against configuration definitions.
+ * @param connectorConfigs the provided configuration values
+ * @return List of Config, each Config contains the updated configuration information given
+ * the current configuration values.
+ */
+ public Config validate(Map<String, String> connectorConfigs) {
+ ConfigDef configDef = config();
+ List<ConfigValue> configValues = configDef.validate(connectorConfigs);
+ return new Config(configValues);
+ }
+
+ /**
+ * Define the configuration for the connector.
+ * @return The ConfigDef for this connector.
+ */
+ public abstract ConfigDef config();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
index 7ea1de2..0517b66 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.connector;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.Test;
@@ -43,6 +44,7 @@ public class ConnectorReconfigurationTest {
}
private static class TestConnector extends Connector {
+
private boolean stopException;
private int order = 0;
public int stopOrder = -1;
@@ -78,5 +80,10 @@ public class ConnectorReconfigurationTest {
if (stopException)
throw new ConnectException("error");
}
+
+ @Override
+ public ConfigDef config() {
+ return new ConfigDef();
+ }
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index a73153f..d423313 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -17,6 +17,9 @@
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
@@ -31,7 +34,10 @@ import java.util.Map;
* sink modes via its 'mode' setting.
*/
public class FileStreamSinkConnector extends SinkConnector {
+
public static final String FILE_CONFIG = "file";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename.");
private String filename;
@@ -66,4 +72,9 @@ public class FileStreamSinkConnector extends SinkConnector {
public void stop() {
// Nothing to do since FileStreamSinkConnector has no background monitoring.
}
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 843e999..4fb33b7 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -17,6 +17,9 @@
package org.apache.kafka.connect.file;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
@@ -35,6 +38,10 @@ public class FileStreamSourceConnector extends SourceConnector {
public static final String TOPIC_CONFIG = "topic";
public static final String FILE_CONFIG = "file";
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+ .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+
private String filename;
private String topic;
@@ -74,4 +81,9 @@ public class FileStreamSourceConnector extends SourceConnector {
public void stop() {
// Nothing to do since FileStreamSourceConnector has no background monitoring.
}
+
+ @Override
+ public ConfigDef config() {
+ return CONFIG_DEF;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index ca85d87..8d83644 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,7 +16,16 @@
**/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -27,7 +36,11 @@ import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
@@ -52,10 +65,14 @@ import java.util.List;
*/
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
+ protected final Worker worker;
protected final StatusBackingStore statusBackingStore;
private final String workerId;
- public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+ protected Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
+
+ public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) {
+ this.worker = worker;
this.statusBackingStore = statusBackingStore;
this.workerId = workerId;
}
@@ -143,6 +160,95 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
status.workerId(), status.trace());
}
+
+ @Override
+ public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
+ ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+ List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
+ ConfigInfos result = generateResult(connType, connectorConfigDef.configKeys(), connectorConfigValues, Collections.<String>emptyList());
+
+ if (result.errorCount() != 0) {
+ return result;
+ }
+
+ Connector connector = getConnector(connType);
+
+ Config config = connector.validate(connectorConfig);
+ ConfigDef configDef = connector.config();
+ Map<String, ConfigKey> configKeys = configDef.configKeys();
+ List<ConfigValue> configValues = config.configValues();
+
+ Map<String, ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+ resultConfigKeys.putAll(connectorConfigDef.configKeys());
+ configValues.addAll(connectorConfigValues);
+
+ List<String> allGroups = new LinkedList<>(connectorConfigDef.groups());
+ List<String> groups = configDef.groups();
+ allGroups.addAll(groups);
+
+ return generateResult(connType, resultConfigKeys, configValues, allGroups);
+ }
+
+ // public for testing
+ public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
+ int errorCount = 0;
+ List<ConfigInfo> configInfoList = new LinkedList<>();
+
+ Map<String, ConfigValue> configValueMap = new HashMap<>();
+ for (ConfigValue configValue: configValues) {
+ String configName = configValue.name();
+ configValueMap.put(configName, configValue);
+ if (!configKeys.containsKey(configName)) {
+ configValue.addErrorMessage("Configuration is not defined: " + configName);
+ configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue)));
+ }
+ }
+
+ for (String configName: configKeys.keySet()) {
+ ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName));
+ ConfigValueInfo configValueInfo = null;
+ if (configValueMap.containsKey(configName)) {
+ ConfigValue configValue = configValueMap.get(configName);
+ configValueInfo = convertConfigValue(configValue);
+ errorCount += configValue.errorMessages().size();
+ }
+ configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
+ }
+ return new ConfigInfos(connType, errorCount, groups, configInfoList);
+ }
+
+ private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
+ String name = configKey.name;
+ String type = configKey.type.name();
+ Object defaultValue = configKey.defaultValue;
+ boolean required = false;
+ if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+ required = true;
+ }
+ String importance = configKey.importance.name();
+ String documentation = configKey.documentation;
+ String group = configKey.group;
+ int orderInGroup = configKey.orderInGroup;
+ String width = configKey.width.name();
+ String displayName = configKey.displayName;
+ List<String> dependents = configKey.dependents;
+ return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+ }
+
+ private static ConfigValueInfo convertConfigValue(ConfigValue configValue) {
+ return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible());
+ }
+
+ private Connector getConnector(String connType) {
+ if (tempConnectors.containsKey(connType)) {
+ return tempConnectors.get(connType);
+ } else {
+ Connector connector = worker.getConnector(connType);
+ tempConnectors.put(connType, connector);
+ return connector;
+ }
+ }
+
private String trace(Throwable t) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
t.printStackTrace(new PrintStream(output));
@@ -152,5 +258,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return null;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 4824acd..e21faf6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
import java.util.HashMap;
import java.util.Map;
@@ -37,32 +38,41 @@ import java.util.Map;
* </p>
*/
public class ConnectorConfig extends AbstractConfig {
+ private static final String COMMON_GROUP = "Common";
public static final String NAME_CONFIG = "name";
private static final String NAME_DOC = "Globally unique name to use for this connector.";
+ private static final String NAME_DISPLAY = "Connector name";
public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
private static final String CONNECTOR_CLASS_DOC =
"Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
"If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " +
" or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
+ private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
public static final int TASKS_MAX_DEFAULT = 1;
+ private static final String TASK_MAX_DISPLAY = "Tasks max";
public static final String TOPICS_CONFIG = "topics";
private static final String TOPICS_DOC = "";
public static final String TOPICS_DEFAULT = "";
+ private static final String TOPICS_DISPLAY = "Topics";
private static ConfigDef config;
static {
config = new ConfigDef()
- .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
- .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
- .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
- .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
+ .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+ .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
+ .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
+ .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
+ }
+
+ public static ConfigDef configDef() {
+ return config;
}
public ConnectorConfig() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 95c7700..3ea4a81 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -127,6 +128,12 @@ public interface Herder {
*/
ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
+ /**
+ * Validate the provided connector config values against the configuration definition.
+ * @param connType the connector class
+ * @param connectorConfig the provided connector config values
+ */
+ ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig);
class Created<T> {
private final boolean created;
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index aa57493..1a9ff11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -189,6 +189,11 @@ public class Worker {
return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
}
+ public Connector getConnector(String connType) {
+ Class<? extends Connector> connectorClass = getConnectorClass(connType);
+ return instantiateConnector(connectorClass);
+ }
+
@SuppressWarnings("unchecked")
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
// Avoid the classpath scan if the full class name was provided
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 16b950b..2fc8297 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -84,7 +84,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
- private final Worker worker;
private final KafkaConfigStorage configStorage;
private ClusterConfigState configState;
private final Time time;
@@ -130,9 +129,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
WorkerGroupMember member,
String restUrl,
Time time) {
- super(statusBackingStore, workerId);
+ super(worker, statusBackingStore, workerId);
- this.worker = worker;
if (configStorage != null) {
// For testing. Assume configuration has already been performed
this.configStorage = configStorage;
@@ -551,6 +549,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return generation;
}
+
// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
return assignment != null && member.memberId().equals(assignment.leader());
@@ -701,7 +700,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
worker.startConnector(connConfig, ctx, this);
-
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
// just restoring an existing connector.
http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index dbac58f..7e4279a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -20,12 +20,14 @@ package org.apache.kafka.connect.runtime.rest;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.eclipse.jetty.server.Connector;
@@ -44,8 +46,6 @@ import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -55,6 +55,9 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+
/**
* Embedded server for the REST API that provides the control plane for Kafka Connect workers.
*/
@@ -95,6 +98,7 @@ public class RestServer {
resourceConfig.register(RootResource.class);
resourceConfig.register(new ConnectorsResource(herder));
+ resourceConfig.register(new ConnectorPluginsResource(herder));
resourceConfig.register(ConnectExceptionMapper.class);