You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/21 00:48:37 UTC

kafka git commit: KAFKA-5472: Eliminated duplicate group names when validating connector results

Repository: kafka
Updated Branches:
  refs/heads/trunk 84de7f175 -> de982ba3f


KAFKA-5472: Eliminated duplicate group names when validating connector results

Kafka Connect was adding duplicate group names in the response from the REST API's validation of connector configurations. This fixes the duplicates and maintains the order of the `ConfigDef` objects so that the `ConfigValue` results are in the same order.

This is a blocker and should be merged to 0.11.0.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #3379 from rhauch/KAFKA-5472


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de982ba3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de982ba3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de982ba3

Branch: refs/heads/trunk
Commit: de982ba3fbf99664f0aaa5aa4b72af8fd1881232
Parents: 84de7f1
Author: Randall Hauch <rh...@gmail.com>
Authored: Tue Jun 20 17:48:32 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jun 20 17:48:32 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/config/ConfigDef.java    |  5 +++--
 .../apache/kafka/connect/runtime/AbstractHerder.java | 15 +++++++++------
 2 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/de982ba3/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 8197b1f..2514e4f 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -82,13 +83,13 @@ public class ConfigDef {
     private Set<String> configsWithNoParent;
 
     public ConfigDef() {
-        configKeys = new HashMap<>();
+        configKeys = new LinkedHashMap<>();
         groups = new LinkedList<>();
         configsWithNoParent = null;
     }
 
     public ConfigDef(ConfigDef base) {
-        configKeys = new HashMap<>(base.configKeys);
+        configKeys = new LinkedHashMap<>(base.configKeys);
         groups = new LinkedList<>(base.groups);
         configsWithNoParent = base.configsWithNoParent == null ? null : new HashSet<>(base.configsWithNoParent);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/de982ba3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 6293b01..cfb8ae0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -46,9 +46,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -232,11 +235,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
 
         List<ConfigValue> configValues = new ArrayList<>();
-        Map<String, ConfigKey> configKeys = new HashMap<>();
-        List<String> allGroups = new ArrayList<>();
+        Map<String, ConfigKey> configKeys = new LinkedHashMap<>();
+        Set<String> allGroups = new LinkedHashSet<>();
 
         Connector connector = getConnector(connType);
-        ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector);
+        ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
         try {
             // do basic connector validation (name, connector type, etc.)
             ConfigDef basicConfigDef = (connector instanceof SourceConnector)
@@ -271,10 +274,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             configKeys.putAll(configDef.configKeys());
             allGroups.addAll(configDef.groups());
             configValues.addAll(config.configValues());
-            return generateResult(connType, configKeys, configValues, allGroups);
+            return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
         } catch (ConfigException e) {
             // Basic validation must have failed. Return the result.
-            return generateResult(connType, configKeys, configValues, allGroups);
+            return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
         } finally {
             Plugins.compareAndSwapLoaders(savedLoader);
         }
@@ -353,7 +356,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         if (tempConnectors.containsKey(connType)) {
             return tempConnectors.get(connType);
         } else {
-            Connector connector = worker.getPlugins().newConnector(connType);
+            Connector connector = plugins().newConnector(connType);
             tempConnectors.put(connType, connector);
             return connector;
         }