You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/05/12 01:18:30 UTC
kafka git commit: KAFKA-4343: Expose Connector type in REST API
(KIP-151)
Repository: kafka
Updated Branches:
refs/heads/trunk 27107ee34 -> b6effcbba
KAFKA-4343: Expose Connector type in REST API (KIP-151)
https://cwiki.apache.org/confluence/display/KAFKA/KIP-151+Expose+Connector+type+in+REST+API
Author: dan norwood <no...@confluent.io>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2960 from norwood/KIP-151
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6effcbb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6effcbb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6effcbb
Branch: refs/heads/trunk
Commit: b6effcbba50eaa3abf91e24f0ae83abb53881add
Parents: 27107ee
Author: dan norwood <no...@confluent.io>
Authored: Thu May 11 18:16:25 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu May 11 18:16:25 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/PluginDiscovery.java | 4 +-
.../rest/entities/ConnectorPluginInfo.java | 77 ++++++++++--
.../runtime/rest/entities/ConnectorType.java | 52 ++++++++
.../rest/entities/ConnectorTypeTest.java | 44 +++++++
.../resources/ConnectorPluginsResourceTest.java | 126 +++++++++++++++++--
5 files changed, 283 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
index d31ce6d..482139a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
@@ -78,14 +78,14 @@ public class PluginDiscovery {
final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
for (Class<? extends Connector> connectorClass : connectorClasses) {
if (isConcrete(connectorClass)) {
- connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+ connectorPlugins.add(new ConnectorPluginInfo(connectorClass));
}
}
Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
@Override
public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
- return a.clazz().compareTo(b.clazz());
+ return a.className().compareTo(b.className());
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
index 47e05bc..ff3c30d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
@@ -19,20 +19,74 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.connector.Connector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
public class ConnectorPluginInfo {
- private String clazz;
+ private static final Logger log = LoggerFactory.getLogger(ConnectorPluginInfo.class);
+
+ private static final Map<Class<? extends Connector>, String>
+ VERSIONS = new ConcurrentHashMap<>();
+
+ private String className;
+ private ConnectorType type;
+ private String version;
@JsonCreator
- public ConnectorPluginInfo(@JsonProperty("class") String clazz) {
- this.clazz = clazz;
+ public ConnectorPluginInfo(
+ @JsonProperty("class") String className,
+ @JsonProperty("type") ConnectorType type,
+ @JsonProperty("version") String version
+ ) {
+ this.className = className;
+ this.type = type;
+ this.version = version;
+ }
+
+ public ConnectorPluginInfo(Class<? extends Connector> klass) {
+ this(klass.getCanonicalName(), ConnectorType.from(klass), getVersion(klass));
+ }
+
+ private static String getVersion(Class<? extends Connector> klass) {
+ if (!VERSIONS.containsKey(klass)) {
+ synchronized (VERSIONS) {
+ if (!VERSIONS.containsKey(klass)) {
+ try {
+ VERSIONS.put(klass, klass.newInstance().version());
+ } catch (
+ ExceptionInInitializerError
+ | InstantiationException
+ | IllegalAccessException
+ | SecurityException e
+ ) {
+ log.warn("Unable to instantiate connector", e);
+ VERSIONS.put(klass, "unknown");
+ }
+ }
+ }
+ }
+ return VERSIONS.get(klass);
}
@JsonProperty("class")
- public String clazz() {
- return clazz;
+ public String className() {
+ return className;
+ }
+
+ @JsonProperty("type")
+ public ConnectorType type() {
+ return type;
+ }
+
+ @JsonProperty("version")
+ public String version() {
+ return version;
}
@Override
@@ -44,16 +98,23 @@ public class ConnectorPluginInfo {
return false;
}
ConnectorPluginInfo that = (ConnectorPluginInfo) o;
- return Objects.equals(clazz, that.clazz);
+ return Objects.equals(className, that.className) &&
+ type == that.type &&
+ Objects.equals(version, that.version);
}
@Override
public int hashCode() {
- return Objects.hash(clazz);
+ return Objects.hash(className, type, version);
}
@Override
public String toString() {
- return clazz;
+ final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
+ sb.append("className='").append(className).append('\'');
+ sb.append(", type=").append(type);
+ sb.append(", version='").append(version).append('\'');
+ sb.append('}');
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
new file mode 100644
index 0000000..292a1ee
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.apache.kafka.connect.source.SourceConnector;
+
+import java.util.Locale;
+
+public enum ConnectorType {
+ SOURCE, SINK, UNKNOWN;
+
+ public static ConnectorType from(Class<? extends Connector> clazz) {
+ if (SinkConnector.class.isAssignableFrom(clazz)) {
+ return SINK;
+ }
+ if (SourceConnector.class.isAssignableFrom(clazz)) {
+ return SOURCE;
+ }
+
+ return UNKNOWN;
+ }
+
+ @Override
+ @JsonValue
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+
+ @JsonCreator
+ public static ConnectorType forValue(String value) {
+ return ConnectorType.valueOf(value.toUpperCase(Locale.ROOT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
new file mode 100644
index 0000000..cd07bd8
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorTypeTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConnectorTypeTest {
+
+ @Test
+ public void testToStringIsLowerCase() {
+ for (ConnectorType ct : ConnectorType.values()) {
+ String shouldBeLower = ct.toString();
+ assertFalse(shouldBeLower.isEmpty());
+ for (Character c : shouldBeLower.toCharArray()) {
+ assertTrue(Character.isLowerCase(c));
+ }
+ }
+ }
+
+ @Test
+ public void testForValue() {
+ for (ConnectorType ct : ConnectorType.values()) {
+ assertEquals(ct, ConnectorType.forValue(ct.toString()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b6effcbb/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 7ba6fd2..966098c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.rest.resources;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
@@ -36,6 +37,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.tools.MockConnector;
@@ -55,6 +57,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -358,18 +361,121 @@ public class ConnectorPluginsResourceTest {
@Test
public void testListConnectorPlugins() {
Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class.getCanonicalName())));
- assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class.getCanonicalName())));
- assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class.getCanonicalName())));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(Connector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SourceConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SinkConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSourceConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(VerifiableSinkConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSourceConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockSinkConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(MockConnector.class)));
+ assertFalse(connectorPlugins.contains(new ConnectorPluginInfo(SchemaSourceConnector.class)));
+ assertTrue(connectorPlugins.contains(new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class)));
}
+ @Test
+ public void testConnectorPluginsIncludesTypeAndVersionInformation()
+ throws IOException {
+ ConnectorPluginInfo sinkInfo = new ConnectorPluginInfo(TestSinkConnector.class);
+ ConnectorPluginInfo sourceInfo = new ConnectorPluginInfo(TestSourceConnector.class);
+ ConnectorPluginInfo unkownInfo =
+ new ConnectorPluginInfo(ConnectorPluginsResourceTestConnector.class);
+ assertEquals(ConnectorType.SINK, sinkInfo.type());
+ assertEquals(ConnectorType.SOURCE, sourceInfo.type());
+ assertEquals(ConnectorType.UNKNOWN, unkownInfo.type());
+ assertEquals(TestSinkConnector.VERSION, sinkInfo.version());
+ assertEquals(TestSourceConnector.VERSION, sourceInfo.version());
+
+ final ObjectMapper objectMapper = new ObjectMapper();
+ String serializedSink = objectMapper.writeValueAsString(ConnectorType.SINK);
+ String serializedSource = objectMapper.writeValueAsString(ConnectorType.SOURCE);
+ String serializedUnknown = objectMapper.writeValueAsString(ConnectorType.UNKNOWN);
+ assertTrue(serializedSink.contains("sink"));
+ assertTrue(serializedSource.contains("source"));
+ assertTrue(serializedUnknown.contains("unknown"));
+ assertEquals(
+ ConnectorType.SINK,
+ objectMapper.readValue(serializedSink, ConnectorType.class)
+ );
+ assertEquals(
+ ConnectorType.SOURCE,
+ objectMapper.readValue(serializedSource, ConnectorType.class)
+ );
+ assertEquals(
+ ConnectorType.UNKNOWN,
+ objectMapper.readValue(serializedUnknown, ConnectorType.class)
+ );
+ }
+
+ public static class TestSinkConnector extends SinkConnector {
+
+ static final String VERSION = "some great version";
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+ }
+
+ public static class TestSourceConnector extends SourceConnector {
+
+ static final String VERSION = "an entirely different version";
+
+ @Override
+ public String version() {
+ return VERSION;
+ }
+
+ @Override
+ public void start(Map<String, String> props) {
+
+ }
+
+ @Override
+ public Class<? extends Task> taskClass() {
+ return null;
+ }
+
+ @Override
+ public List<Map<String, String>> taskConfigs(int maxTasks) {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+ }
/* Name here needs to be unique as we are testing the aliasing mechanism */
public static class ConnectorPluginsResourceTestConnector extends Connector {