You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/10/12 16:24:34 UTC
[camel-kafka-connector] 02/05: Provide API to add connector in
Catalog #573
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e282796af175ed6cd57d548caa5fcd1ef114dad7
Author: Aurélien Pupier <ap...@redhat.com>
AuthorDate: Mon Oct 12 14:57:48 2020 +0200
Provide API to add connector in Catalog #573
- removed getConnectorAsJson from public API . I'm not sure that it is
very useful now that we can have the Java model and add a connector. it
is also simplifying the API to add a connector
Signed-off-by: Aurélien Pupier <ap...@redhat.com>
---
.../catalog/CamelKafkaConnectorCatalog.java | 28 ++++++++++++----
.../catalog/CamelKafkaConnectorCatalogTest.java | 37 ++++++++++++++++++++--
2 files changed, 56 insertions(+), 9 deletions(-)
diff --git a/camel-kafka-connector-catalog/src/main/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalog.java b/camel-kafka-connector-catalog/src/main/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalog.java
index b9718b5..65d910f 100644
--- a/camel-kafka-connector-catalog/src/main/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalog.java
+++ b/camel-kafka-connector-catalog/src/main/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalog.java
@@ -39,8 +39,8 @@ import org.slf4j.LoggerFactory;
public class CamelKafkaConnectorCatalog {
- static List<String> connectorsName = new ArrayList<String>();
- static Map<String, CamelKafkaConnectorModel> connectorsModel = new HashMap<String, CamelKafkaConnectorModel>();
+ static List<String> connectorsName = new ArrayList<>();
+ static Map<String, CamelKafkaConnectorModel> connectorsModel = new HashMap<>();
private static final Logger LOG = LoggerFactory.getLogger(CamelKafkaConnectorCatalog.class);
private static final String CONNECTORS_DIR = "connectors";
private static final String DESCRIPTORS_DIR = "descriptors";
@@ -83,7 +83,7 @@ public class CamelKafkaConnectorCatalog {
}
}
- public String getConnectorAsJson(String connectorName) {
+ private String loadConnectorAsJsonFromEmbeddedCatalog(String connectorName) {
String result = null;
try (InputStream connectorModelInputSream = CamelKafkaConnectorCatalog.class.getResourceAsStream(File.separator + CONNECTORS_DIR + File.separator + connectorName + ".json")) {
result = new BufferedReader(new InputStreamReader(connectorModelInputSream, StandardCharsets.UTF_8))
@@ -97,8 +97,12 @@ public class CamelKafkaConnectorCatalog {
}
private CamelKafkaConnectorModel getConnectorModel(String connectorName) {
+ String json = loadConnectorAsJsonFromEmbeddedCatalog(connectorName);
+ return createModel(json);
+ }
+
+ private CamelKafkaConnectorModel createModel(String json) {
CamelKafkaConnectorModel model = new CamelKafkaConnectorModel();
- String json = getConnectorAsJson(connectorName);
JsonObject obj = JsonMapper.deserialize(json);
JsonObject wrapper = (JsonObject)obj.get("connector");
model.setConnectorClass((String)wrapper.get("class"));
@@ -106,12 +110,12 @@ public class CamelKafkaConnectorCatalog {
model.setGroupId((String)wrapper.get("groupId"));
model.setType((String)wrapper.get("type"));
model.setVersion((String)wrapper.get("version"));
- model.setOptions((List<CamelKafkaConnectorOptionModel>)getConnectorOptionModel(obj));
+ model.setOptions(getConnectorOptionModel(obj));
return model;
}
private List<CamelKafkaConnectorOptionModel> getConnectorOptionModel(JsonObject obj) {
- List<CamelKafkaConnectorOptionModel> model = new ArrayList<CamelKafkaConnectorOptionModel>();
+ List<CamelKafkaConnectorOptionModel> model = new ArrayList<>();
JsonObject wrapper = (JsonObject)obj.get("properties");
Set<String> options = wrapper.keySet();
for (String string : options) {
@@ -133,4 +137,16 @@ public class CamelKafkaConnectorCatalog {
public Map<String, CamelKafkaConnectorModel> getConnectorsModel() {
return connectorsModel;
}
+
+ /**
+ * Register a new Connector definition in the catalog.
+ * If it already exists, the previous one is overwritten.
+ *
+ * @param connectorName - the connector name
+ * @param connectorDefinitionAsJson - the definition of the connector provided as a String with Json format
+ */
+ public void addConnector(String connectorName, String connectorDefinitionAsJson) {
+ connectorsName.add(connectorName);
+ connectorsModel.put(connectorName, createModel(connectorDefinitionAsJson));
+ }
}
diff --git a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
index 38dd791..446bc38 100644
--- a/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
+++ b/camel-kafka-connector-catalog/src/test/java/org/apache/camel/kafkaconnector/catalog/CamelKafkaConnectorCatalogTest.java
@@ -16,16 +16,16 @@
*/
package org.apache.camel.kafkaconnector.catalog;
+import static org.junit.jupiter.api.Assertions.*;
+
import java.util.List;
import java.util.Map;
import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorModel;
+import org.apache.camel.kafkaconnector.model.CamelKafkaConnectorOptionModel;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
class CamelKafkaConnectorCatalogTest {
static CamelKafkaConnectorCatalog catalog;
@@ -54,4 +54,35 @@ class CamelKafkaConnectorCatalogTest {
assertEquals("camel.sink.endpoint.autoCreateBucket", model.getOptions().get(2).getName());
}
+ @Test
+ void testAddConnector() throws Exception {
+ String connectorName = "my-test-connector";
+ catalog.addConnector(connectorName, "{\n"
+ + " \"connector\": {\n"
+ + " \"class\": \"org.apache.camel.kafkaconnector.my-test-connector.TestDemoConnector\",\n"
+ + " \"artifactId\": \"camel-my-test-connector-kafka-connector\",\n"
+ + " \"groupId\": \"org.apache.camel.kafkaconnector\",\n"
+ + " \"id\": \"my-test-connector\",\n"
+ + " \"type\": \"sink\",\n"
+ + " \"version\": \"0.6.0-SNAPSHOT\"\n"
+ + " },\n"
+ + " \"properties\": {\n"
+ + " \"camel.component.my-test-connector.demo\": {\n"
+ + " \"name\": \"camel.component.my-test-connector.demo\",\n"
+ + " \"description\": \"A demo description of the component\",\n"
+ + " \"defaultValue\": \"\\\"false\\\"\",\n"
+ + " \"priority\": \"MEDIUM\"\n"
+ + " }\n"
+ + " }\n"
+ + "}\n");
+
+ assertTrue(catalog.getConnectorsName().contains(connectorName));
+ assertNotNull(catalog.getConnectorsModel().get(connectorName));
+ CamelKafkaConnectorOptionModel camelKafkaConnectorOptionModel = catalog.getConnectorsModel().get(connectorName).getOptions().get(0);
+ assertEquals("\"false\"", camelKafkaConnectorOptionModel.getDefaultValue());
+ assertEquals("camel.component.my-test-connector.demo", camelKafkaConnectorOptionModel.getName());
+ assertEquals("MEDIUM", camelKafkaConnectorOptionModel.getPriority());
+ assertEquals("A demo description of the component", camelKafkaConnectorOptionModel.getDescription());
+ }
+
}