You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/05/12 01:18:30 UTC

kafka git commit: KAFKA-4343: Expose Connector type in REST API (KIP-151)

Repository: kafka
Updated Branches:
  refs/heads/trunk 27107ee34 -> b6effcbba


KAFKA-4343: Expose Connector type in REST API (KIP-151)

https://cwiki.apache.org/confluence/display/KAFKA/KIP-151+Expose+Connector+type+in+REST+API

Author: dan norwood <no...@confluent.io>

Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2960 from norwood/KIP-151


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6effcbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6effcbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6effcbb

Branch: refs/heads/trunk
Commit: b6effcbba50eaa3abf91e24f0ae83abb53881add
Parents: 27107ee
Author: dan norwood <no...@confluent.io>
Authored: Thu May 11 18:16:25 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 11 18:16:25 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/PluginDiscovery.java  |   4 +-
 .../rest/entities/ConnectorPluginInfo.java      |  77 ++++++++++--
 .../runtime/rest/entities/ConnectorType.java    |  52 ++++++++
 .../rest/entities/ConnectorTypeTest.java        |  44 +++++++
 .../resources/ConnectorPluginsResourceTest.java | 126 +++++++++++++++++--
 5 files changed, 283 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
index d31ce6d..482139a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
@@ -78,14 +78,14 @@ public class PluginDiscovery {
         final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
         for (Class<? extends Connector> connectorClass : connectorClasses) {
             if (isConcrete(connectorClass)) {
-                connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+                connectorPlugins.add(new ConnectorPluginInfo(connectorClass));
             }
         }
 
         Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
             @Override
             public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
-                return a.clazz().compareTo(b.clazz());
+                return a.className().compareTo(b.className());
             }
         });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
index 47e05bc..ff3c30d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -19,20 +19,74 @@ package org.apache.kafka.connect.runtime.rest.entities;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import org.apache.kafka.connect.connector.Connector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class ConnectorPluginInfo {
 
-    private String clazz;
+    private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class);
+
+    private static final Map<Class<? extends Connector>, String>
+        VERSIONS = new ConcurrentHashMap<>();
+
+    private String className;
+    private ConnectorType type;
+    private String version;
 
     @JsonCreator
-    public ConnectorPluginInfo(@JsonProperty("class") String clazz) {
-        this.clazz = clazz;
+    public ConnectorPluginInfo(
+        @JsonProperty("class") String className,
+        @JsonProperty("type") ConnectorType type,
+        @JsonProperty("version") String version
+    ) {
+        this.className = className;
+        this.type = type;
+        this.version = version;
+    }
+
+    public ConnectorPluginInfo(Class<? extends Connector> klass) {
+        this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass));
+    }
+
+    private static String getVersion(Class<? extends Connector> klass) {
+        if (!VERSIONS.containsKey(klass)) {
+            synchronized (VERSIONS) {
+                if (!VERSIONS.containsKey(klass)) {
+                    try {
+                        VERSIONS.put(klass, klass.newInstance().version());
+                    } catch (
+                        ExceptionInInitializerError
+                            | InstantiationException
+                            | IllegalAccessException
+                            | SecurityException e
+                    ) {
+                        log.warn("Unable to instantiate connector", e);
+                        VERSIONS.put(klass, "unknown");
+                    }
+                }
+            }
+        }
+        return VERSIONS.get(klass);
     }
 
     @JsonProperty("class")
-    public String clazz() {
-        return clazz;
+    public String className() {
+        return className;
+    }
+
+    @JsonProperty("type")
+    public ConnectorType type() {
+        return type;
+    }
+
+    @JsonProperty("version")
+    public String version() {
+        return version;
     }
 
     @Override
@@ -44,16 +98,23 @@ public class ConnectorPluginInfo {
             return false;
         }
         ConnectorPluginInfo that = (ConnectorPluginInfo) o;
-        return Objects.equals(clazz, that.clazz);
+        return Objects.equals(className, that.className) &&
+               type == that.type &&
+               Objects.equals(version, that.version);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(clazz);
+        return Objects.hash(className, type, version);
     }
 
     @Override
     public String toString() {
-        return clazz;
+        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
+        sb.append("className='").append(className).append('\'');
+        sb.append(", type=").append(type);
+        sb.append(", version='").append(version).append('\'');
+        sb.append('}');
+        return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
new file mode 100644
index 0000000..292a1ee
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.Locale;
+
+public enum ConnectorType {
+    SOURCE, SINK, UNKNOWN;
+
+    public static ConnectorType from(Class<? extends Connector> clazz) {
+        if (SinkConnector.class.isAssignableFrom(clazz)) {
+            return SINK;
+        }
+        if (SourceConnector.class.isAssignableFrom(clazz)) {
+            return SOURCE;
+        }
+
+        return UNKNOWN;
+    }
+
+    @Override
+    @JsonValue
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);
+    }
+
+    @JsonCreator
+    public static ConnectorType forValue(String value) {
+        return ConnectorType.valueOf(value.toUpperCase(Locale.ROOT));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
new file mode 100644
index 0000000..cd07bd8
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectorTypeTest {
+
+    @Test
+    public void testToStringIsLowerCase() {
+        for (ConnectorType ct : ConnectorType.values()) {
+            String shouldBeLower = ct.toString();
+            assertFalse(shouldBeLower.isEmpty());
+            for (Character c : shouldBeLower.toCharArray()) {
+                assertTrue(Character.isLowerCase(c));
+            }
+        }
+    }
+
+    @Test
+    public void testForValue() {
+        for (ConnectorType ct : ConnectorType.values()) {
+            assertEquals(ct, ConnectorType.forValue(ct.toString()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 7ba6fd2..966098c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.rest.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.kafka.common.config.Config;
 import org.apache.kafka.common.config.ConfigDef;
@@ -36,6 +37,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.tools.MockConnector;
@@ -55,6 +57,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -358,18 +361,121 @@ public class ConnectorPluginsResourceTest {
     @Test
     public void testListConnectorPlugins() {
         Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
-        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName())));
-        assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class)));
+        assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class)));
+        assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class)));
     }
 
+    @Test
+    public void testConnectorPluginsIncludesTypeAndVersionInformation()
+        throws IOException {
+        ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class);
+        ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class);
+        ConnectorPluginInfo unkownInfo =
+            new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class);
+        assertEquals(ConnectorType.SINK, sinkInfo.type());
+        assertEquals(ConnectorType.SOURCE, sourceInfo.type());
+        assertEquals(ConnectorType.UNKNOWN, unkownInfo.type());
+        assertEquals(TestSinkConnector.VERSION, sinkInfo.version());
+        assertEquals(TestSourceConnector.VERSION, sourceInfo.version());
+
+        final ObjectMapper objectMapper = new ObjectMapper();
+        String serializedSink = objectMapper.writeValueAsString(ConnectorType.SINK);
+        String serializedSource = objectMapper.writeValueAsString(ConnectorType.SOURCE);
+        String serializedUnknown = objectMapper.writeValueAsString(ConnectorType.UNKNOWN);
+        assertTrue(serializedSink.contains("sink"));
+        assertTrue(serializedSource.contains("source"));
+        assertTrue(serializedUnknown.contains("unknown"));
+        assertEquals(
+            ConnectorType.SINK,
+            objectMapper.readValue(serializedSink, ConnectorType.class)
+        );
+        assertEquals(
+            ConnectorType.SOURCE,
+            objectMapper.readValue(serializedSource, ConnectorType.class)
+        );
+        assertEquals(
+            ConnectorType.UNKNOWN,
+            objectMapper.readValue(serializedUnknown, ConnectorType.class)
+        );
+    }
+
+    public static class TestSinkConnector extends SinkConnector {
+
+        static final String VERSION = "some great version";
+
+        @Override
+        public String version() {
+            return VERSION;
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+    }
+
+    public static class TestSourceConnector extends SourceConnector {
+
+        static final String VERSION = "an entirely different version";
+
+        @Override
+        public String version() {
+            return VERSION;
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+
+        }
+
+        @Override
+        public Class<? extends Task> taskClass() {
+            return null;
+        }
+
+        @Override
+        public List<Map<String, String>> taskConfigs(int maxTasks) {
+            return null;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+    }
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class ConnectorPluginsResourceTestConnector extends Connector {