You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/17 21:26:17 UTC

[1/2] kafka git commit: KAFKA-3315: Add REST and Connector API to expose connector configuration

Repository: kafka
Updated Branches:
  refs/heads/trunk a1eb12d7c -> c07d01722


http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
new file mode 100644
index 0000000..6040563
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfo.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class ConfigInfo {
+
+    private ConfigKeyInfo configKey;
+    private ConfigValueInfo configValue;
+
+    @JsonCreator
+    public ConfigInfo(
+        @JsonProperty("definition") ConfigKeyInfo configKey,
+        @JsonProperty("value") ConfigValueInfo configValue) {
+        this.configKey = configKey;
+        this.configValue = configValue;
+    }
+
+    @JsonProperty("definition")
+    public ConfigKeyInfo configKey() {
+        return configKey;
+    }
+
+    @JsonProperty("value")
+    public ConfigValueInfo configValue() {
+        return configValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigInfo that = (ConfigInfo) o;
+        return Objects.equals(configKey, that.configKey) &&
+               Objects.equals(configValue, that.configValue);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(configKey, configValue);
+    }
+
+    @Override
+    public String toString() {
+        return "[" + configKey.toString() + "," + configValue.toString() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
new file mode 100644
index 0000000..3e73983
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigInfos.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigInfos {
+
+    @JsonProperty("name")
+    private final String name;
+
+    @JsonProperty("error_count")
+    private final int errorCount;
+
+    @JsonProperty("groups")
+    private final List<String> groups;
+
+    @JsonProperty("configs")
+    private final List<ConfigInfo> configs;
+
+    @JsonCreator
+    public ConfigInfos(@JsonProperty("name") String name,
+                       @JsonProperty("error_count") int errorCount,
+                       @JsonProperty("groups") List<String> groups,
+                       @JsonProperty("configs") List<ConfigInfo> configs) {
+        this.name = name;
+        this.groups = groups;
+        this.errorCount = errorCount;
+        this.configs = configs;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public List<String> groups() {
+        return groups;
+    }
+
+    @JsonProperty("error_count")
+    public int errorCount() {
+        return errorCount;
+    }
+
+    @JsonProperty("configs")
+    public List<ConfigInfo> values() {
+        return configs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigInfos that = (ConfigInfos) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(errorCount, that.errorCount) &&
+               Objects.equals(groups, that.groups) &&
+               Objects.equals(configs, that.configs);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, errorCount, groups, configs);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(errorCount)
+            .append(",")
+            .append(groups)
+            .append(",")
+            .append(configs)
+            .append("]");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
new file mode 100644
index 0000000..f813709
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigKeyInfo {
+
+    private final String name;
+    private final String type;
+    private final boolean required;
+    private final Object defaultValue;
+    private final String importance;
+    private final String documentation;
+    private final String group;
+    private final int orderInGroup;
+    private final String width;
+    private final String displayName;
+    private final List<String> dependents;
+
+    @JsonCreator
+    public ConfigKeyInfo(@JsonProperty("name") String name,
+                         @JsonProperty("type") String type,
+                         @JsonProperty("required") boolean required,
+                         @JsonProperty("default_value") Object defaultValue,
+                         @JsonProperty("importance") String importance,
+                         @JsonProperty("documentation") String documentation,
+                         @JsonProperty("group") String group,
+                         @JsonProperty("order_in_group") int orderInGroup,
+                         @JsonProperty("width") String width,
+                         @JsonProperty("display_name") String displayName,
+                         @JsonProperty("dependents") List<String> dependents) {
+        this.name = name;
+        this.type = type;
+        this.required = required;
+        this.defaultValue = defaultValue;
+        this.importance = importance;
+        this.documentation = documentation;
+        this.group = group;
+        this.orderInGroup = orderInGroup;
+        this.width = width;
+        this.displayName = displayName;
+        this.dependents = dependents;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public String type() {
+        return type;
+    }
+
+    @JsonProperty
+    public boolean required() {
+        return required;
+    }
+
+    @JsonProperty("default_value")
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    @JsonProperty
+    public String documentation() {
+        return documentation;
+    }
+
+    @JsonProperty
+    public String group() {
+        return group;
+    }
+
+    @JsonProperty("order")
+    public int orderInGroup() {
+        return orderInGroup;
+    }
+
+    @JsonProperty
+    public String width() {
+        return width;
+    }
+
+    @JsonProperty
+    public String importance() {
+        return importance;
+    }
+
+    @JsonProperty("display_name")
+    public String displayName() {
+        return displayName;
+    }
+
+    @JsonProperty
+    public List<String> dependents() {
+        return dependents;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigKeyInfo that = (ConfigKeyInfo) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(type, that.type) &&
+               Objects.equals(required, that.required) &&
+               Objects.equals(defaultValue, that.defaultValue) &&
+               Objects.equals(importance, that.importance) &&
+               Objects.equals(documentation, that.documentation) &&
+               Objects.equals(group, that.group) &&
+               Objects.equals(orderInGroup, that.orderInGroup) &&
+               Objects.equals(width, that.width) &&
+               Objects.equals(displayName, that.displayName) &&
+               Objects.equals(dependents, that.dependents);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(type)
+            .append(",")
+            .append(required)
+            .append(",")
+            .append(defaultValue)
+            .append(",")
+            .append(importance)
+            .append(",")
+            .append(documentation)
+            .append(",")
+            .append(group)
+            .append(",")
+            .append(orderInGroup)
+            .append(",")
+            .append(width)
+            .append(",")
+            .append(displayName)
+            .append(",")
+            .append(dependents)
+            .append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
new file mode 100644
index 0000000..51e7ee5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConfigValueInfo.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigValueInfo {
+    private String name;
+    private Object value;
+    private List<Object> recommendedValues;
+    private List<String> errors;
+    private boolean visible;
+
+    @JsonCreator
+    public ConfigValueInfo(
+        @JsonProperty("name") String name,
+        @JsonProperty("value") Object value,
+        @JsonProperty("recommended_values") List<Object> recommendedValues,
+        @JsonProperty("errors") List<String> errors,
+        @JsonProperty("visible") boolean visible) {
+        this.name = name;
+        this.value = value;
+        this.recommendedValues = recommendedValues;
+        this.errors = errors;
+        this.visible = visible;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public Object value() {
+        return value;
+    }
+
+    @JsonProperty("recommended_values")
+    public List<Object> recommendedValues() {
+        return recommendedValues;
+    }
+
+    @JsonProperty
+    public List<String> errors() {
+        return errors;
+    }
+
+    @JsonProperty
+    public boolean visible() {
+        return visible;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigValueInfo that = (ConfigValueInfo) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(value, that.value) &&
+               Objects.equals(recommendedValues, that.recommendedValues) &&
+               Objects.equals(errors, that.errors) &&
+               Objects.equals(visible, that.visible);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, value, recommendedValues, errors, visible);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(value)
+            .append(",")
+            .append(recommendedValues)
+            .append(",")
+            .append(errors)
+            .append(",")
+            .append(visible)
+            .append("]");
+        return sb.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
index 8daae05..9567ef9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
@@ -34,13 +35,15 @@ public class ConnectorInfo {
     private final List<ConnectorTaskId> tasks;
 
     @JsonCreator
-    public ConnectorInfo(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config,
+    public ConnectorInfo(@JsonProperty("name") String name,
+                         @JsonProperty("config") Map<String, String> config,
                          @JsonProperty("tasks") List<ConnectorTaskId> tasks) {
         this.name = name;
         this.config = config;
         this.tasks = tasks;
     }
 
+
     @JsonProperty
     public String name() {
         return name;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
new file mode 100644
index 0000000..8439707
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/connector-plugins")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorPluginsResource {
+
+    private final Herder herder;
+
+    public ConnectorPluginsResource(Herder herder) {
+        this.herder = herder;
+    }
+
+    @PUT
+    @Path("/{connectorType}/config/validate")
+    public ConfigInfos validateConfigs(final @PathParam("connectorType") String connType,
+                                       final Map<String, String> connectorConfig) throws Throwable {
+        return herder.validateConfigs(connType, connectorConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index d0d940b..b6e9f61 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -32,6 +33,14 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -43,13 +52,6 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import java.net.URI;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 @Path("/connectors")
 @Produces(MediaType.APPLICATION_JSON)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 707470f..9c48ed7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -49,7 +49,6 @@ import java.util.Set;
 public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
-    private final Worker worker;
     private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
     public StandaloneHerder(Worker worker) {
@@ -60,8 +59,7 @@ public class StandaloneHerder extends AbstractHerder {
     StandaloneHerder(String workerId,
                      Worker worker,
                      StatusBackingStore statusBackingStore) {
-        super(statusBackingStore, workerId);
-        this.worker = worker;
+        super(worker, statusBackingStore, workerId);
     }
 
     public synchronized void start() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
index 0ab64fd..c2515a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.tools;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSinkConnector extends SourceConnector {
     @Override
     public void stop() {
     }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
index 5f9afd5..b18db6e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSourceConnector.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.tools;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
@@ -61,4 +62,9 @@ public class VerifiableSourceConnector extends SourceConnector {
     @Override
     public void stop() {
     }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index f17023c..1dc5784 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -34,6 +34,7 @@ public class AbstractHerderTest extends EasyMockSupport {
 
     @Test
     public void connectorStatus() {
+        Worker worker = null;
         String workerId = "workerId";
         String connector = "connector";
         int generation = 5;
@@ -42,8 +43,8 @@ public class AbstractHerderTest extends EasyMockSupport {
         StatusBackingStore store = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(StatusBackingStore.class, String.class)
-                .withArgs(store, workerId)
+                .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+                .withArgs(worker, store, workerId)
                 .addMockedMethod("generation")
                 .createMock();
 
@@ -76,14 +77,15 @@ public class AbstractHerderTest extends EasyMockSupport {
 
     @Test
     public void taskStatus() {
+        Worker worker = null;
         ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
         String workerId = "workerId";
 
         StatusBackingStore store = strictMock(StatusBackingStore.class);
 
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(StatusBackingStore.class, String.class)
-                .withArgs(store, workerId)
+                .withConstructor(Worker.class, StatusBackingStore.class, String.class)
+                .withArgs(worker, store, workerId)
                 .addMockedMethod("generation")
                 .createMock();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 67d3fdc..557d789 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -466,7 +467,11 @@ public class WorkerTest extends ThreadedTest {
 
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
-    private static class WorkerTestConnector extends Connector {
+    public static class WorkerTestConnector extends Connector {
+
+        private static final ConfigDef CONFIG_DEF  = new ConfigDef()
+            .define("configName", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Test configName.");
+
         @Override
         public String version() {
             return "1.0";
@@ -491,6 +496,11 @@ public class WorkerTest extends ThreadedTest {
         public void stop() {
 
         }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
     }
 
     private static class TestSourceTask extends SourceTask {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
new file mode 100644
index 0000000..625c91f
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.AbstractHerder;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RestServer.class)
+@PowerMockIgnore("javax.management.*")
+public class ConnectorPluginsResourceTest {
+
+    private static Map<String, String> props = new HashMap<>();
+    static {
+        props.put("test.string.config", "testString");
+        props.put("test.int.config", "10");
+    }
+
+    private static final ConfigInfos CONFIG_INFOS;
+    static {
+        List<ConfigInfo> configs = new LinkedList<>();
+
+        ConfigKeyInfo configKeyInfo = new ConfigKeyInfo("test.string.config", "STRING", true, "", "HIGH", "Test configuration for string type.", null, -1, "NONE", "test.string.config", new LinkedList<String>());
+        ConfigValueInfo configValueInfo = new ConfigValueInfo("test.string.config", "testString", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        ConfigInfo configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        configKeyInfo = new ConfigKeyInfo("test.int.config", "INT", true, "", "MEDIUM", "Test configuration for integer type.", null, -1, "NONE", "test.int.config", new LinkedList<String>());
+        configValueInfo = new ConfigValueInfo("test.int.config", 10, Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        configKeyInfo = new ConfigKeyInfo("test.string.config.default", "STRING", false, "", "LOW", "Test configuration with default value.", null, -1, "NONE", "test.string.config.default", new LinkedList<String>());
+        configValueInfo = new ConfigValueInfo("test.string.config.default", "", Collections.<Object>emptyList(), Collections.<String>emptyList(), true);
+        configInfo = new ConfigInfo(configKeyInfo, configValueInfo);
+        configs.add(configInfo);
+
+        CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), 0, Collections.<String>emptyList(), configs);
+    }
+
+    @Mock
+    private Herder herder;
+    private ConnectorPluginsResource connectorPluginsResource;
+
+    @Before
+    public void setUp() throws NoSuchMethodException {
+        PowerMock.mockStatic(RestServer.class,
+                             RestServer.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class));
+        connectorPluginsResource = new ConnectorPluginsResource(herder);
+    }
+
+    @Test
+    public void testValidateConfig() throws Throwable {
+        herder.validateConfigs(EasyMock.eq(ConnectorPluginsResourceTestConnector.class.getName()), EasyMock.eq(props));
+
+        PowerMock.expectLastCall().andAnswer(new IAnswer<ConfigInfos>() {
+            @Override
+            public ConfigInfos answer() {
+                Config config = new ConnectorPluginsResourceTestConnector().validate(props);
+                Connector connector = new ConnectorPluginsResourceTestConnector();
+                ConfigDef configDef = connector.config();
+                return AbstractHerder.generateResult(ConnectorPluginsResourceTestConnector.class.getName(), configDef.configKeys(), config.configValues(), configDef.groups());
+            }
+        });
+        PowerMock.replayAll();
+
+        ConfigInfos configInfos = connectorPluginsResource.validateConfigs(ConnectorPluginsResourceTestConnector.class.getName(), props);
+        assertEquals(CONFIG_INFOS.name(), configInfos.name());
+        assertEquals(CONFIG_INFOS.errorCount(), configInfos.errorCount());
+        assertEquals(CONFIG_INFOS.groups(), configInfos.groups());
+        assertEquals(new HashSet<>(CONFIG_INFOS.values()), new HashSet<>(configInfos.values()));
+
+        PowerMock.verifyAll();
+    }
+
+    /* Name here needs to be unique as we are testing the aliasing mechanism */
+    public static class ConnectorPluginsResourceTestConnector extends Connector {
+
+        public static final String TEST_STRING_CONFIG = "test.string.config";
+        public static final String TEST_INT_CONFIG = "test.int.config";
+        public static final String TEST_STRING_CONFIG_DEFAULT = "test.string.config.default";
+
+        private static final ConfigDef CONFIG_DEF = new ConfigDef()
+            .define(TEST_STRING_CONFIG, Type.STRING, Importance.HIGH, "Test configuration for string type.")
+            .define(TEST_INT_CONFIG, Type.INT, Importance.MEDIUM, "Test configuration for integer type.")
+            .define(TEST_STRING_CONFIG_DEFAULT, Type.STRING, "", Importance.LOW, "Test configuration with default value.");
+
+        @Override
+        public String version() {
+            return "1.0";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index 4659ae8..970f56c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
@@ -83,7 +84,6 @@ public class ConnectorsResourceTest {
         TASK_INFOS.add(new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 1), TASK_CONFIGS.get(1)));
     }
 
-
     @Mock
     private Herder herder;
     private ConnectorsResource connectorsResource;
@@ -172,6 +172,8 @@ public class ConnectorsResourceTest {
         connectorsResource.createConnector(body);
 
         PowerMock.verifyAll();
+
+
     }
 
     @Test(expected = AlreadyExistsException.class)


[2/2] kafka git commit: KAFKA-3315: Add REST and Connector API to expose connector configuration

Posted by ew...@apache.org.
KAFKA-3315: Add REST and Connector API to expose connector configuration

Author: Liquan Pei <li...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #964 from Ishiihara/expose-connector-config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c07d0172
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c07d0172
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c07d0172

Branch: refs/heads/trunk
Commit: c07d017227f319250e5c373f8a6f504874ecfbf2
Parents: a1eb12d
Author: Liquan Pei <li...@gmail.com>
Authored: Thu Mar 17 13:26:02 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 17 13:26:02 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../kafka/common/config/AbstractConfig.java     |  26 +-
 .../org/apache/kafka/common/config/Config.java  |  28 +
 .../apache/kafka/common/config/ConfigDef.java   | 652 +++++++++++++++++--
 .../apache/kafka/common/config/ConfigValue.java | 113 ++++
 .../kafka/common/config/ConfigDefTest.java      | 202 +++++-
 .../kafka/connect/connector/Connector.java      |  21 +
 .../connector/ConnectorReconfigurationTest.java |   9 +-
 .../connect/file/FileStreamSinkConnector.java   |  11 +
 .../connect/file/FileStreamSourceConnector.java |  12 +
 .../kafka/connect/runtime/AbstractHerder.java   | 109 +++-
 .../kafka/connect/runtime/ConnectorConfig.java  |  18 +-
 .../apache/kafka/connect/runtime/Herder.java    |   7 +
 .../apache/kafka/connect/runtime/Worker.java    |   5 +
 .../runtime/distributed/DistributedHerder.java  |   6 +-
 .../kafka/connect/runtime/rest/RestServer.java  |   8 +-
 .../runtime/rest/entities/ConfigInfo.java       |  66 ++
 .../runtime/rest/entities/ConfigInfos.java      | 102 +++
 .../runtime/rest/entities/ConfigKeyInfo.java    | 171 +++++
 .../runtime/rest/entities/ConfigValueInfo.java  | 106 +++
 .../runtime/rest/entities/ConnectorInfo.java    |   5 +-
 .../resources/ConnectorPluginsResource.java     |  49 ++
 .../rest/resources/ConnectorsResource.java      |  16 +-
 .../runtime/standalone/StandaloneHerder.java    |   4 +-
 .../connect/tools/VerifiableSinkConnector.java  |   6 +
 .../tools/VerifiableSourceConnector.java        |   6 +
 .../connect/runtime/AbstractHerderTest.java     |  10 +-
 .../kafka/connect/runtime/WorkerTest.java       |  12 +-
 .../resources/ConnectorPluginsResourceTest.java | 165 +++++
 .../rest/resources/ConnectorsResourceTest.java  |   4 +-
 30 files changed, 1836 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 321fc3f..4b84ba5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -543,6 +543,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/*"
     include "**/org/apache/kafka/common/errors/*"
     include "**/org/apache/kafka/common/serialization/*"
+    include "**/org/apache/kafka/common/config/*"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index b44f72c..f833d7e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -12,6 +12,13 @@
  */
 package org.apache.kafka.common.config;
 
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -20,13 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kafka.common.Configurable;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A convenient base class for configurations to extend.
  * <p>
@@ -46,7 +46,7 @@ public class AbstractConfig {
     private final Map<String, Object> values;
 
     @SuppressWarnings("unchecked")
-    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Boolean doLog) {
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
         /* check that all the keys are really strings */
         for (Object key : originals.keySet())
             if (!(key instanceof String))
@@ -62,6 +62,12 @@ public class AbstractConfig {
         this(definition, originals, true);
     }
 
+    public AbstractConfig(Map<String, Object> parsedConfig) {
+        this.values = parsedConfig;
+        this.originals = new HashMap<>();
+        this.used = Collections.synchronizedSet(new HashSet<String>());
+    }
+
     protected Object get(String key) {
         if (!values.containsKey(key))
             throw new ConfigException(String.format("Unknown configuration '%s'", key));
@@ -94,7 +100,7 @@ public class AbstractConfig {
         return (List<String>) get(key);
     }
 
-    public boolean getBoolean(String key) {
+    public Boolean getBoolean(String key) {
         return (Boolean) get(key);
     }
 
@@ -125,7 +131,7 @@ public class AbstractConfig {
     /**
      * Get all the original settings, ensuring that all values are of type String.
      * @return the original settings
-     * @throw ClassCastException if any of the values are not strings
+     * @throws ClassCastException if any of the values are not strings
      */
     public Map<String, String> originalsStrings() {
         Map<String, String> copy = new RecordingMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/Config.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/Config.java b/clients/src/main/java/org/apache/kafka/common/config/Config.java
new file mode 100644
index 0000000..ce5ee17
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/Config.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.config;
+
+import java.util.List;
+
+public class Config {
+    private List<ConfigValue> configValues;
+
+    public Config(List<ConfigValue> configValues) {
+        this.configValues = configValues;
+    }
+
+    public List<ConfigValue> configValues() {
+        return configValues;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 703eb7c..881cb0b 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -12,45 +12,66 @@
  */
 package org.apache.kafka.common.config;
 
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.utils.Utils;
-
 /**
- * This class is used for specifying the set of expected configurations, their type, their defaults, their
- * documentation, and any special validation logic used for checking the correctness of the values the user provides.
+ * This class is used for specifying the set of expected configurations. For each configuration, you can specify
+ * the name, the type, the default value, the documentation, the group information, the order in the group,
+ * the width of the configuration value and the name suitable for display in the UI.
+ *
+ * You can provide special validation logic used for single configuration validation by overriding {@link Validator}.
+ *
+ * Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration
+ * may change according to the values of other configurations. You can override {@link Recommender} to get valid
+ * values and set visibility of a configuration given the current configuration values.
+ *
  * <p/>
- * Usage of this class looks something like this:
+ * To use the class:
  * <p/>
  * <pre>
  * ConfigDef defs = new ConfigDef();
- * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
- * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
  *
- * Properties props = new Properties();
- * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
+ * defs.define(&quot;config_with_default&quot;, Type.STRING, &quot;default string value&quot;, &quot;Configuration with default value.&quot;);
+ * defs.define(&quot;config_with_validator&quot;, Type.INT, 42, Range.atLeast(0), &quot;Configuration with user provided validator.&quot;);
+ * defs.define(&quot;config_with_dependents&quot;, Type.INT, &quot;Configuration with dependents.&quot;, &quot;group&quot;, 1, &quot;Config With Dependents&quot;, Arrays.asList(&quot;config_with_default;&quot;,&quot;config_with_validator&quot;));
+ *
+ * Map&lt;String, String&gt; props = new HashMap&lt;&gt();
+ * props.put(&quot;config_with_default&quot;, &quot;some value&quot;);
+ * props.put(&quot;config_with_dependents&quot;, &quot;some other value&quot;);
+ * // will return &quot;some value&quot;
  * Map&lt;String, Object&gt; configs = defs.parse(props);
+ * String someConfig = (String) configs.get(&quot;config_with_default&quot;);
+ * // will return default value of 42
+ * int anotherConfig = (Integer) configs.get(&quot;config_with_validator&quot;);
  *
- * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
- * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
+ * To validate the full configuration, use:
+ * List&lt;Config&gt; configs = def.validate(props);
+ * The {@link Config} contains updated configuration information given the current configuration values.
  * </pre>
  * <p/>
- * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
+ * This class can be used standalone or in combination with {@link AbstractConfig} which provides some additional
  * functionality for accessing configs.
  */
 public class ConfigDef {
 
     public static final Object NO_DEFAULT_VALUE = new String("");
 
-    private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
+    private final Map<String, ConfigKey> configKeys = new HashMap<>();
+    private final List<String> groups = new LinkedList<>();
+    private Set<String> configsWithNoParent;
 
     /**
      * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
@@ -63,26 +84,256 @@ public class ConfigDef {
 
     /**
      * Define a new configuration
-     *
-     * @param name          The name of the config parameter
-     * @param type          The type of the config
-     * @param defaultValue  The default value to use if this config isn't present
-     * @param validator     A validator to use in checking the correctness of the config
-     * @param importance    The importance of this config: is this something you will likely need to change.
-     * @param documentation The documentation string for the config
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param validator     the validator to use in checking the correctness of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @param recommender   the recommender provides valid values given the parent configuration values
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
-        if (configKeys.containsKey(name))
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
+        if (configKeys.containsKey(name)) {
             throw new ConfigException("Configuration " + name + " is defined twice.");
+        }
+        if (group != null && !groups.contains(group)) {
+            groups.add(group);
+        }
         Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
         return this;
     }
 
     /**
+     * Define a new configuration with no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param validator     the validator to use in checking the correctness of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, List<String> dependents) {
+        return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+    }
+
+    /**
+     * Define a new configuration with no dependents
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param validator     the validator to use in checking the correctness of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param recommender   the recommender provides valid values given the parent configuration values
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
+        return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+    }
+
+    /**
+     * Define a new configuration with no dependents and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param validator     the validator to use in checking the correctness of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName) {
+        return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+    }
+
+    /**
+     * Define a new configuration with no special validation logic
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @param recommender   the recommender provides valid values given the parent configuration values
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
+        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender);
+    }
+
+    /**
+     * Define a new configuration with no special validation logic and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, List<String> dependents) {
+        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+    }
+
+    /**
+     * Define a new configuration with no special validation logic and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param recommender   the recommender provides valid values given the parent configuration values
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName, Recommender recommender) {
+        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+    }
+
+    /**
+     * Define a new configuration with no special validation logic, not dependents and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation,
+                            String group, int orderInGroup, Width width, String displayName) {
+        return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+    }
+
+    /**
+     * Define a new configuration with no default value and no special validation logic
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @param recommender   the recommender provides valid values given the parent configuration value
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+                            Width width, String displayName, List<String> dependents, Recommender recommender) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender);
+    }
+
+    /**
+     * Define a new configuration with no default value, no special validation logic and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param dependents    the configurations that are dependents of this configuration
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+                            Width width, String displayName, List<String> dependents) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null);
+    }
+
+    /**
+     * Define a new configuration with no default value, no special validation logic and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @param recommender   the recommender provides valid values given the parent configuration value
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+                            Width width, String displayName, Recommender recommender) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender);
+    }
+
+    /**
+     * Define a new configuration with no default value, no special validation logic, no dependents and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @param group         the group this config belongs to
+     * @param orderInGroup  the order of this config in the group
+     * @param width         the width of the config
+     * @param displayName   the name suitable for display
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup,
+                            Width width, String displayName) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList());
+    }
+
+    /**
+     * Define a new configuration with no group, no order in group, no width, no display name, no dependents and no custom recommender
+     * @param name          the name of the config parameter
+     * @param type          the type of the config
+     * @param defaultValue  the default value to use if this config isn't present
+     * @param validator     the validator to use in checking the correctness of the config
+     * @param importance    the importance of this config
+     * @param documentation the documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+        return define(name, type, defaultValue, validator, importance, documentation, null, -1, Width.NONE, name);
+    }
+
+    /**
      * Define a new configuration with no special validation logic
-     *
      * @param name          The name of the config parameter
      * @param type          The type of the config
      * @param defaultValue  The default value to use if this config isn't present
@@ -96,7 +347,6 @@ public class ConfigDef {
 
     /**
      * Define a new configuration with no default value and no special validation logic
-     *
      * @param name          The name of the config parameter
      * @param type          The type of the config
      * @param importance    The importance of this config: is this something you will likely need to change.
@@ -108,6 +358,22 @@ public class ConfigDef {
     }
 
     /**
+     * Get the configuration keys
+     * @return a map containing all configuration keys
+     */
+    public Map<String, ConfigKey> configKeys() {
+        return configKeys;
+    }
+
+    /**
+     * Get the groups for the configuration
+     * @return a list of group names
+     */
+    public List<String> groups() {
+        return groups;
+    }
+
+    /**
      * Add standard SSL client configuration options.
      * @return this
      */
@@ -131,34 +397,188 @@ public class ConfigDef {
      * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
      * programmatically constructed map.
      *
-     * @param props The configs to parse and validate
+     * @param props The configs to parse and validate.
      * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
-     * the appropriate type (int, string, etc)
+     * the appropriate type (int, string, etc).
      */
     public Map<String, Object> parse(Map<?, ?> props) {
-        /* parse all known keys */
-        Map<String, Object> values = new HashMap<String, Object>();
+        // Check all configurations are defined
+        List<String> undefinedConfigKeys = undefinedDependentConfigs();
+        if (!undefinedConfigKeys.isEmpty()) {
+            String joined = Utils.join(undefinedConfigKeys, ",");
+            throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined);
+        }
+        // parse all known keys
+        Map<String, Object> values = new HashMap<>();
         for (ConfigKey key : configKeys.values()) {
             Object value;
             // props map contains setting - assign ConfigKey value
-            if (props.containsKey(key.name))
+            if (props.containsKey(key.name)) {
                 value = parseType(key.name, props.get(key.name), key.type);
-            // props map doesn't contain setting, the key is required because no default value specified - its an error
-            else if (key.defaultValue == NO_DEFAULT_VALUE)
+                // props map doesn't contain setting, the key is required because no default value specified - its an error
+            } else if (key.defaultValue == NO_DEFAULT_VALUE) {
                 throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
-            // otherwise assign setting its default value
-            else
+            } else {
+                // otherwise assign setting its default value
                 value = key.defaultValue;
-            if (key.validator != null)
+            }
+            if (key.validator != null) {
                 key.validator.ensureValid(key.name, value);
+            }
             values.put(key.name, value);
         }
         return values;
     }
 
     /**
+     * Validate the current configuration values with the configuration definition.
+     * @param props the current configuration values
+     * @return List of Config, each Config contains the updated configuration information given
+     * the current configuration values.
+     */
+    public List<ConfigValue> validate(Map<String, String> props) {
+        Map<String, ConfigValue> configValues = new HashMap<>();
+        for (String name: configKeys.keySet()) {
+            configValues.put(name, new ConfigValue(name));
+        }
+
+        List<String> undefinedConfigKeys = undefinedDependentConfigs();
+        for (String undefinedConfigKey: undefinedConfigKeys) {
+            ConfigValue undefinedConfigValue = new ConfigValue(undefinedConfigKey);
+            undefinedConfigValue.addErrorMessage(undefinedConfigKey + " is referred in the dependents, but not defined.");
+            undefinedConfigValue.visible(false);
+            configValues.put(undefinedConfigKey, undefinedConfigValue);
+        }
+
+        Map<String, Object> parsed = parseForValidate(props, configValues);
+        return validate(parsed, configValues);
+    }
+
+    // package accessible for testing
+    Map<String, Object> parseForValidate(Map<String, String> props, Map<String, ConfigValue> configValues) {
+        Map<String, Object> parsed = new HashMap<>();
+        Set<String> configsWithNoParent = getConfigsWithNoParent();
+        for (String name: configsWithNoParent) {
+            parseForValidate(name, props, parsed, configValues);
+        }
+        return parsed;
+    }
+
+
+    private List<ConfigValue> validate(Map<String, Object> parsed, Map<String, ConfigValue> configValues) {
+        Set<String> configsWithNoParent = getConfigsWithNoParent();
+        for (String name: configsWithNoParent) {
+            validate(name, parsed, configValues);
+        }
+        return new LinkedList<>(configValues.values());
+    }
+
+    private List<String> undefinedDependentConfigs() {
+        Set<String> undefinedConfigKeys = new HashSet<>();
+        for (String configName: configKeys.keySet()) {
+            ConfigKey configKey = configKeys.get(configName);
+            List<String> dependents = configKey.dependents;
+            for (String dependent: dependents) {
+                if (!configKeys.containsKey(dependent)) {
+                    undefinedConfigKeys.add(dependent);
+                }
+            }
+        }
+        return new LinkedList<>(undefinedConfigKeys);
+    }
+
+    private Set<String> getConfigsWithNoParent() {
+        if (this.configsWithNoParent != null) {
+            return this.configsWithNoParent;
+        }
+        Set<String> configsWithParent = new HashSet<>();
+
+        for (ConfigKey configKey: configKeys.values()) {
+            List<String> dependents = configKey.dependents;
+            configsWithParent.addAll(dependents);
+        }
+
+        Set<String> configs = new HashSet<>(configKeys.keySet());
+        configs.removeAll(configsWithParent);
+        this.configsWithNoParent = configs;
+        return configs;
+    }
+
+    private void parseForValidate(String name, Map<String, String> props, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
+        if (!configKeys.containsKey(name)) {
+            return;
+        }
+        ConfigKey key = configKeys.get(name);
+        ConfigValue config = configs.get(name);
+
+        Object value = null;
+        if (props.containsKey(key.name)) {
+            try {
+                value = parseType(key.name, props.get(key.name), key.type);
+            } catch (ConfigException e) {
+                config.addErrorMessage(e.getMessage());
+            }
+        } else if (key.defaultValue == NO_DEFAULT_VALUE) {
+            config.addErrorMessage("Missing required configuration \"" + key.name + "\" which has no default value.");
+        } else {
+            value = key.defaultValue;
+        }
+
+        if (key.validator != null) {
+            try {
+                key.validator.ensureValid(key.name, value);
+            } catch (ConfigException e) {
+                config.addErrorMessage(e.getMessage());
+            }
+        }
+        config.value(value);
+        parsed.put(name, value);
+        for (String dependent: key.dependents) {
+            parseForValidate(dependent, props, parsed, configs);
+        }
+    }
+
+    private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) {
+        if (!configKeys.containsKey(name)) {
+            return;
+        }
+        ConfigKey key = configKeys.get(name);
+        ConfigValue config = configs.get(name);
+        Object value = parsed.get(name);
+        List<Object> recommendedValues;
+        if (key.recommender != null) {
+            try {
+                recommendedValues = key.recommender.validValues(name, parsed);
+                List<Object> originalRecommendedValues = config.recommendedValues();
+
+                if (!originalRecommendedValues.isEmpty()) {
+                    Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues);
+                    Iterator<Object> it = recommendedValues.iterator();
+                    while (it.hasNext()) {
+                        Object o = it.next();
+                        if (!originalRecommendedValueSet.contains(o)) {
+                            it.remove();
+                        }
+                    }
+                }
+                config.recommendedValues(recommendedValues);
+                if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) {
+                    config.addErrorMessage("Invalid value for configuration " + key.name);
+                }
+                config.visible(key.recommender.visible(name, parsed));
+            } catch (ConfigException e) {
+                config.addErrorMessage(e.getMessage());
+            }
+        }
+
+        configs.put(name, config);
+        for (String dependent: key.dependents) {
+            validate(dependent, parsed, configs);
+        }
+    }
+
+    /**
      * Parse a value according to its expected type.
-     *
      * @param name  The config name
      * @param value The config value
      * @param type  The expected type
@@ -263,15 +683,56 @@ public class ConfigDef {
         BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD
     }
 
+    /**
+     * The importance level for a configuration
+     */
     public enum Importance {
         HIGH, MEDIUM, LOW
     }
 
     /**
-     * Validation logic the user may provide
+     * The width of a configuration value
+     */
+    public enum Width {
+        NONE, SHORT, MEDIUM, LONG
+    }
+
+    /**
+     * This is used by the {@link #validate(Map)} to get valid values for a configuration given the current
+     * configuration values in order to perform full configuration validation and visibility modification.
+     * In case that there are dependencies between configurations, the valid values and visibility
+     * for a configuration may change given the values of other configurations.
+     */
+    public interface Recommender {
+
+        /**
+         * The valid values for the configuration given the current configuration values.
+         * @param name The name of the configuration
+         * @param parsedConfig The parsed configuration values
+         * @return The list of valid values. To function properly, the returned objects should have the type
+         * defined for the configuration using the recommender.
+         */
+        List<Object> validValues(String name, Map<String, Object> parsedConfig);
+
+        /**
+         * Set the visibility of the configuration given the current configuration values.
+         * @param name The name of the configuration
+         * @param parsedConfig The parsed configuration values
+         * @return The visibility of the configuration
+         */
+        boolean visible(String name, Map<String, Object> parsedConfig);
+    }
+
+    /**
+     * Validation logic the user may provide to perform single configuration validation.
      */
     public interface Validator {
-        public void ensureValid(String name, Object o);
+        /**
+         * Perform single configuration validation.
+         * @param name The name of the configuration
+         * @param value The value of the configuration
+         */
+        void ensureValid(String name, Object value);
     }
 
     /**
@@ -345,16 +806,24 @@ public class ConfigDef {
         }
     }
 
-    private static class ConfigKey {
+    public static class ConfigKey {
         public final String name;
         public final Type type;
         public final String documentation;
         public final Object defaultValue;
         public final Validator validator;
         public final Importance importance;
-
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
-            super();
+        public final String group;
+        public final int orderInGroup;
+        public final Width width;
+        public final String displayName;
+        public final List<String> dependents;
+        public final Recommender recommender;
+
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator,
+                         Importance importance, String documentation, String group,
+                         int orderInGroup, Width width, String displayName,
+                         List<String> dependents, Recommender recommender) {
             this.name = name;
             this.type = type;
             this.defaultValue = defaultValue;
@@ -363,34 +832,21 @@ public class ConfigDef {
             if (this.validator != null && this.hasDefault())
                 this.validator.ensureValid(name, defaultValue);
             this.documentation = documentation;
+            this.dependents = dependents;
+            this.group = group;
+            this.orderInGroup = orderInGroup;
+            this.width = width;
+            this.displayName = displayName;
+            this.recommender = recommender;
         }
 
         public boolean hasDefault() {
             return this.defaultValue != NO_DEFAULT_VALUE;
         }
-
     }
 
     public String toHtmlTable() {
-        // sort first required fields, then by importance, then name
-        List<ConfigDef.ConfigKey> configs = new ArrayList<ConfigDef.ConfigKey>(this.configKeys.values());
-        Collections.sort(configs, new Comparator<ConfigDef.ConfigKey>() {
-            public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
-                // first take anything with no default value (therefore required)
-                if (!k1.hasDefault() && k2.hasDefault())
-                    return -1;
-                else if (!k2.hasDefault() && k1.hasDefault())
-                    return 1;
-
-                // then sort by importance
-                int cmp = k1.importance.compareTo(k2.importance);
-                if (cmp == 0)
-                    // then sort in alphabetical order
-                    return k1.name.compareTo(k2.name);
-                else
-                    return cmp;
-            }
-        });
+        List<ConfigKey> configs = sortedConfigs();
         StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");
         b.append("<tr>\n");
@@ -434,4 +890,74 @@ public class ConfigDef {
         b.append("</tbody></table>");
         return b.toString();
     }
+
+    /**
+     * Get the configs formatted with reStructuredText, suitable for embedding in Sphinx
+     * documentation.
+     */
+    public String toRst() {
+        List<ConfigKey> configs = sortedConfigs();
+        StringBuilder b = new StringBuilder();
+
+        for (ConfigKey def : configs) {
+            b.append("``");
+            b.append(def.name);
+            b.append("``\n");
+            for (String docLine : def.documentation.split("\n")) {
+                if (docLine.length() == 0) {
+                    continue;
+                }
+                b.append("  ");
+                b.append(docLine);
+                b.append("\n\n");
+            }
+            b.append("  * Type: ");
+            b.append(def.type.toString().toLowerCase());
+            b.append("\n");
+            if (def.defaultValue != null) {
+                b.append("  * Default: ");
+                if (def.type == Type.STRING) {
+                    b.append("\"");
+                    b.append(def.defaultValue);
+                    b.append("\"");
+                } else {
+                    b.append(def.defaultValue);
+                }
+                b.append("\n");
+            }
+            b.append("  * Importance: ");
+            b.append(def.importance.toString().toLowerCase());
+            b.append("\n\n");
+        }
+        return b.toString();
+    }
+
+    /**
+     * Get a list of configs sorted into "natural" order: listing required fields first, then
+     * ordering by importance, and finally by name.
+     */
+    private List<ConfigKey> sortedConfigs() {
+        // sort first required fields, then by importance, then name
+        List<ConfigKey> configs = new ArrayList<>(this.configKeys.values());
+        Collections.sort(configs, new Comparator<ConfigKey>() {
+            public int compare(ConfigKey k1, ConfigKey k2) {
+                // first take anything with no default value
+                if (!k1.hasDefault() && k2.hasDefault()) {
+                    return -1;
+                } else if (!k2.hasDefault() && k1.hasDefault()) {
+                    return 1;
+                }
+
+                // then sort by importance
+                int cmp = k1.importance.compareTo(k2.importance);
+                if (cmp == 0) {
+                    // then sort in alphabetical order
+                    return k1.name.compareTo(k2.name);
+                } else {
+                    return cmp;
+                }
+            }
+        });
+        return configs;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
new file mode 100644
index 0000000..c9a4a34
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigValue {
+
+    private String name;
+    private Object value;
+    private List<Object> recommendedValues;
+    private List<String> errorMessages;
+    private boolean visible;
+
+    public ConfigValue(String name) {
+        this(name, null, new LinkedList<Object>(), new LinkedList<String>());
+    }
+
+    public ConfigValue(String name, Object value, List<Object> recommendedValues, List<String> errorMessages) {
+        this.name = name;
+        this.value = value;
+        this.recommendedValues = recommendedValues;
+        this.errorMessages = errorMessages;
+        this.visible = true;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Object value() {
+        return value;
+    }
+
+    public List<Object> recommendedValues() {
+        return recommendedValues;
+    }
+
+    public List<String> errorMessages() {
+        return errorMessages;
+    }
+
+    public boolean visible() {
+        return visible;
+    }
+
+    public void value(Object value) {
+        this.value = value;
+    }
+
+    public void recommendedValues(List<Object> recommendedValues) {
+        this.recommendedValues = recommendedValues;
+    }
+
+    public void addErrorMessage(String errorMessage) {
+        this.errorMessages.add(errorMessage);
+    }
+
+    public void visible(boolean visible) {
+        this.visible = visible;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigValue that = (ConfigValue) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(value, that.value) &&
+               Objects.equals(recommendedValues, that.recommendedValues) &&
+               Objects.equals(errorMessages, that.errorMessages) &&
+               Objects.equals(visible, that.visible);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, value, recommendedValues, errorMessages, visible);
+    }
+
+    @Override
+    public String toString() {
+        StringBuffer sb = new StringBuffer();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(value)
+            .append(",")
+            .append(recommendedValues)
+            .append(",")
+            .append(errorMessages)
+            .append(",")
+            .append(visible)
+            .append("]");
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index fa0370b..022fb6b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -12,22 +12,27 @@
  */
 package org.apache.kafka.common.config;
 
-import static java.util.Arrays.asList;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
 import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Validator;
 import org.apache.kafka.common.config.ConfigDef.Range;
-import org.apache.kafka.common.config.ConfigDef.ValidString;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.ValidString;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigDef.Width;
 import org.apache.kafka.common.config.types.Password;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class ConfigDefTest {
 
     @Test
@@ -156,7 +161,8 @@ public class ConfigDefTest {
         final String key = "enum_test";
 
         ConfigDef def = new ConfigDef();
-        def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
+        def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+                   ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
 
         Properties props = new Properties();
         props.put(key, "ONE");
@@ -164,6 +170,180 @@ public class ConfigDefTest {
         assertEquals("ONE", vals.get(key));
     }
 
+    @Test
+    public void testGroupInference() {
+        List<String> expected1 = Arrays.asList("group1", "group2");
+        ConfigDef def1 = new ConfigDef()
+            .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a")
+            .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b")
+            .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+
+        assertEquals(expected1, def1.groups());
+
+        List<String> expected2 = Arrays.asList("group2", "group1");
+        ConfigDef def2 = new ConfigDef()
+            .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a")
+            .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b")
+            .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+
+        assertEquals(expected2, def2.groups());
+    }
+
+    @Test
+    public void testParseForValidate() {
+        Map<String, Object> expectedParsed = new HashMap<>();
+        expectedParsed.put("a", 1);
+        expectedParsed.put("b", null);
+        expectedParsed.put("c", null);
+        expectedParsed.put("d", 10);
+
+        Map<String, ConfigValue> expected = new HashMap<>();
+        String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+        String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+        ConfigValue configA = new ConfigValue("a", 1, Collections.<Object>emptyList(), Collections.<String>emptyList());
+        ConfigValue configB = new ConfigValue("b", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageB, errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageC));
+        ConfigValue configD = new ConfigValue("d", 10, Collections.<Object>emptyList(), Collections.<String>emptyList());
+        expected.put("a", configA);
+        expected.put("b", configB);
+        expected.put("c", configC);
+        expected.put("d", configD);
+
+        ConfigDef def = new ConfigDef()
+            .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+            .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+            .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+            .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false));
+
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "1");
+        props.put("d", "10");
+
+        Map<String, ConfigValue> configValues = new HashMap<>();
+
+        for (String name: def.configKeys().keySet()) {
+            configValues.put(name, new ConfigValue(name));
+        }
+
+        Map<String, Object> parsed = def.parseForValidate(props, configValues);
+
+        assertEquals(expectedParsed, parsed);
+        assertEquals(expected, configValues);
+    }
+
+    @Test
+    public void testValidate() {
+        Map<String, ConfigValue> expected = new HashMap<>();
+        String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+        String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+        String errorMessageD = "Invalid value for configuration d";
+
+        ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+        ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+        ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD));
+
+        expected.put("a", configA);
+        expected.put("b", configB);
+        expected.put("c", configC);
+        expected.put("d", configD);
+
+        ConfigDef def = new ConfigDef()
+            .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+            .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+            .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+            .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false));
+
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "1");
+        props.put("d", "10");
+
+        List<ConfigValue> configs = def.validate(props);
+        for (ConfigValue config : configs) {
+            String name = config.name();
+            ConfigValue expectedConfig = expected.get(name);
+            assertEquals(expectedConfig, config);
+        }
+    }
+
+    @Test
+    public void testValidateMissingConfigKey() {
+        Map<String, ConfigValue> expected = new HashMap<>();
+        String errorMessageB = "Missing required configuration \"b\" which has no default value.";
+        String errorMessageC = "Missing required configuration \"c\" which has no default value.";
+        String errorMessageD = "d is referred in the dependents, but not defined.";
+
+        ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList());
+        ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB));
+        ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC));
+        ConfigValue configD = new ConfigValue("d", null, Collections.emptyList(), Arrays.asList(errorMessageD));
+        configD.visible(false);
+
+        expected.put("a", configA);
+        expected.put("b", configB);
+        expected.put("c", configC);
+        expected.put("d", configD);
+
+        ConfigDef def = new ConfigDef()
+            .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false))
+            .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+            .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true));
+
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "1");
+
+        List<ConfigValue> configs = def.validate(props);
+        for (ConfigValue config: configs) {
+            String name = config.name();
+            ConfigValue expectedConfig = expected.get(name);
+            assertEquals(expectedConfig, config);
+        }
+    }
+
+    @Test
+    public void testValidateCannotParse() {
+        Map<String, ConfigValue> expected = new HashMap<>();
+        String errorMessageB = "Invalid value non_integer for configuration a: Not a number of type INT";
+        ConfigValue configA = new ConfigValue("a", null, Collections.emptyList(), Arrays.asList(errorMessageB));
+        expected.put("a", configA);
+
+        ConfigDef def = new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs");
+        Map<String, String> props = new HashMap<>();
+        props.put("a", "non_integer");
+
+        List<ConfigValue> configs = def.validate(props);
+        for (ConfigValue config: configs) {
+            String name = config.name();
+            ConfigValue expectedConfig = expected.get(name);
+            assertEquals(expectedConfig, config);
+        }
+    }
+
+    private static class IntegerRecommender implements ConfigDef.Recommender {
+
+        private boolean hasParent;
+
+        public IntegerRecommender(boolean hasParent) {
+            this.hasParent = hasParent;
+        }
+
+        @Override
+        public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+            List<Object> values = new LinkedList<>();
+            if (!hasParent) {
+                values.addAll(Arrays.asList(1, 2, 3));
+            } else {
+                values.addAll(Arrays.asList(4, 5));
+            }
+            return values;
+        }
+
+        @Override
+        public boolean visible(String name, Map<String, Object> parsedConfig) {
+            return true;
+        }
+    }
+
     private void testValidators(Type type, Validator validator, Object defaultVal, Object[] okValues, Object[] badValues) {
         ConfigDef def = new ConfigDef().define("name", type, defaultVal, validator, Importance.HIGH, "docs");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
index 934cdbd..1370156 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java
@@ -18,6 +18,9 @@
 package org.apache.kafka.connect.connector;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigValue;
 
 import java.util.List;
 import java.util.Map;
@@ -121,4 +124,22 @@ public abstract class Connector {
      * Stop this connector.
      */
     public abstract void stop();
+
+    /**
+     * Validate the connector configuration values against configuration definitions.
+     * @param connectorConfigs the provided configuration values
+     * @return List of Config, each Config contains the updated configuration information given
+     * the current configuration values.
+     */
+    public Config validate(Map<String, String> connectorConfigs) {
+        ConfigDef configDef = config();
+        List<ConfigValue> configValues = configDef.validate(connectorConfigs);
+        return new Config(configValues);
+    }
+
+    /**
+     * Define the configuration for the connector.
+     * @return The ConfigDef for this connector.
+     */
+    public abstract ConfigDef config();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
----------------------------------------------------------------------
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
index 7ea1de2..0517b66 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.connector;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
@@ -43,6 +44,7 @@ public class ConnectorReconfigurationTest {
     }
 
     private static class TestConnector extends Connector {
+
         private boolean stopException;
         private int order = 0;
         public int stopOrder = -1;
@@ -78,5 +80,10 @@ public class ConnectorReconfigurationTest {
             if (stopException)
                 throw new ConnectException("error");
         }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef();
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
index a73153f..d423313 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -31,7 +34,10 @@ import java.util.Map;
  * sink modes via its 'mode' setting.
  */
 public class FileStreamSinkConnector extends SinkConnector {
+
     public static final String FILE_CONFIG = "file";
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename.");
 
     private String filename;
 
@@ -66,4 +72,9 @@ public class FileStreamSinkConnector extends SinkConnector {
     public void stop() {
         // Nothing to do since FileStreamSinkConnector has no background monitoring.
     }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
index 843e999..4fb33b7 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.connect.file;
 
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
@@ -35,6 +38,10 @@ public class FileStreamSourceConnector extends SourceConnector {
     public static final String TOPIC_CONFIG = "topic";
     public static final String FILE_CONFIG = "file";
 
+    private static final ConfigDef CONFIG_DEF = new ConfigDef()
+        .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
+        .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
+
     private String filename;
     private String topic;
 
@@ -74,4 +81,9 @@ public class FileStreamSourceConnector extends SourceConnector {
     public void stop() {
         // Nothing to do since FileStreamSourceConnector has no background monitoring.
     }
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index ca85d87..8d83644 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -16,7 +16,16 @@
  **/
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.common.config.Config;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.ConfigKey;
+import org.apache.kafka.common.config.ConfigValue;
+import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -27,7 +36,11 @@ import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
@@ -52,10 +65,14 @@ import java.util.List;
  */
 public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
 
+    protected final Worker worker;
     protected final StatusBackingStore statusBackingStore;
     private final String workerId;
 
-    public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+    protected Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
+
+    public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) {
+        this.worker = worker;
         this.statusBackingStore = statusBackingStore;
         this.workerId = workerId;
     }
@@ -143,6 +160,95 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
                 status.workerId(), status.trace());
     }
 
+
+    @Override
+    public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) {
+        ConfigDef connectorConfigDef = ConnectorConfig.configDef();
+        List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig);
+        ConfigInfos result = generateResult(connType, connectorConfigDef.configKeys(), connectorConfigValues, Collections.<String>emptyList());
+
+        if (result.errorCount() != 0) {
+            return result;
+        }
+
+        Connector connector = getConnector(connType);
+
+        Config config = connector.validate(connectorConfig);
+        ConfigDef configDef = connector.config();
+        Map<String, ConfigKey> configKeys = configDef.configKeys();
+        List<ConfigValue> configValues = config.configValues();
+
+        Map<String, ConfigKey> resultConfigKeys = new HashMap<>(configKeys);
+        resultConfigKeys.putAll(connectorConfigDef.configKeys());
+        configValues.addAll(connectorConfigValues);
+
+        List<String> allGroups = new LinkedList<>(connectorConfigDef.groups());
+        List<String> groups = configDef.groups();
+        allGroups.addAll(groups);
+
+        return generateResult(connType, resultConfigKeys, configValues, allGroups);
+    }
+
+    // public for testing
+    public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
+        int errorCount = 0;
+        List<ConfigInfo> configInfoList = new LinkedList<>();
+
+        Map<String, ConfigValue> configValueMap = new HashMap<>();
+        for (ConfigValue configValue: configValues) {
+            String configName = configValue.name();
+            configValueMap.put(configName, configValue);
+            if (!configKeys.containsKey(configName)) {
+                configValue.addErrorMessage("Configuration is not defined: " + configName);
+                configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue)));
+            }
+        }
+
+        for (String configName: configKeys.keySet()) {
+            ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName));
+            ConfigValueInfo configValueInfo = null;
+            if (configValueMap.containsKey(configName)) {
+                ConfigValue configValue = configValueMap.get(configName);
+                configValueInfo = convertConfigValue(configValue);
+                errorCount += configValue.errorMessages().size();
+            }
+            configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
+        }
+        return new ConfigInfos(connType, errorCount, groups, configInfoList);
+    }
+
+    private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
+        String name = configKey.name;
+        String type = configKey.type.name();
+        Object defaultValue = configKey.defaultValue;
+        boolean required = false;
+        if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) {
+            required = true;
+        }
+        String importance = configKey.importance.name();
+        String documentation = configKey.documentation;
+        String group = configKey.group;
+        int orderInGroup = configKey.orderInGroup;
+        String width = configKey.width.name();
+        String displayName = configKey.displayName;
+        List<String> dependents = configKey.dependents;
+        return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+    }
+
+    private static ConfigValueInfo convertConfigValue(ConfigValue configValue) {
+        return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible());
+    }
+
+    private Connector getConnector(String connType) {
+        if (tempConnectors.containsKey(connType)) {
+            return tempConnectors.get(connType);
+        } else {
+            Connector connector = worker.getConnector(connType);
+            tempConnectors.put(connType, connector);
+            return connector;
+        }
+    }
+
     private String trace(Throwable t) {
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         t.printStackTrace(new PrintStream(output));
@@ -152,5 +258,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             return null;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 4824acd..e21faf6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,32 +38,41 @@ import java.util.Map;
  * </p>
  */
 public class ConnectorConfig extends AbstractConfig {
+    private static final String COMMON_GROUP = "Common";
 
     public static final String NAME_CONFIG = "name";
     private static final String NAME_DOC = "Globally unique name to use for this connector.";
+    private static final String NAME_DISPLAY = "Connector name";
 
     public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
     private static final String CONNECTOR_CLASS_DOC =
                     "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
                     "If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " +
                     " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
+    private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
     public static final int TASKS_MAX_DEFAULT = 1;
+    private static final String TASK_MAX_DISPLAY = "Tasks max";
 
     public static final String TOPICS_CONFIG = "topics";
     private static final String TOPICS_DOC = "";
     public static final String TOPICS_DEFAULT = "";
+    private static final String TOPICS_DISPLAY = "Topics";
 
     private static ConfigDef config;
 
     static {
         config = new ConfigDef()
-                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
-                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
-                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
-                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
+                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
+                .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY);
+    }
+
+    public static ConfigDef configDef() {
+        return config;
     }
 
     public ConnectorConfig() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 95c7700..3ea4a81 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -127,6 +128,12 @@ public interface Herder {
      */
     ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
 
+    /**
+     * Validate the provided connector config values against the configuration definition.
+     * @param connType the connector class
+     * @param connectorConfig the provided connector config values
+     */
+    ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig);
 
     class Created<T> {
         private final boolean created;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index aa57493..1a9ff11 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -189,6 +189,11 @@ public class Worker {
         return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
     }
 
+    public Connector getConnector(String connType) {
+        Class<? extends Connector> connectorClass = getConnectorClass(connType);
+        return instantiateConnector(connectorClass);
+    }
+
     @SuppressWarnings("unchecked")
     private Class<? extends Connector> getConnectorClass(String connectorAlias) {
         // Avoid the classpath scan if the full class name was provided

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 16b950b..2fc8297 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -84,7 +84,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
 
-    private final Worker worker;
     private final KafkaConfigStorage configStorage;
     private ClusterConfigState configState;
     private final Time time;
@@ -130,9 +129,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                       WorkerGroupMember member,
                       String restUrl,
                       Time time) {
-        super(statusBackingStore, workerId);
+        super(worker, statusBackingStore, workerId);
 
-        this.worker = worker;
         if (configStorage != null) {
             // For testing. Assume configuration has already been performed
             this.configStorage = configStorage;
@@ -551,6 +549,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         return generation;
     }
 
+
     // Should only be called from work thread, so synchronization should not be needed
     private boolean isLeader() {
         return assignment != null && member.memberId().equals(assignment.leader());
@@ -701,7 +700,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
         worker.startConnector(connConfig, ctx, this);
-
         // Immediately request configuration since this could be a brand new connector. However, also only update those
         // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
         // just restoring an existing connector.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index dbac58f..7e4279a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -20,12 +20,14 @@ package org.apache.kafka.connect.runtime.rest;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
 import org.apache.kafka.connect.runtime.rest.resources.RootResource;
 import org.eclipse.jetty.server.Connector;
@@ -44,8 +46,6 @@ import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -55,6 +55,9 @@ import java.net.URL;
 import java.util.List;
 import java.util.Map;
 
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
@@ -95,6 +98,7 @@ public class RestServer {
 
         resourceConfig.register(RootResource.class);
         resourceConfig.register(new ConnectorsResource(herder));
+        resourceConfig.register(new ConnectorPluginsResource(herder));
 
         resourceConfig.register(ConnectExceptionMapper.class);