You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:20 UTC
[camel-k-runtime] 03/03: kamelet source/sink component #490
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 979288ecc735e8613a39ca5d627d2f288fb68c2f
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 20:59:58 2020 +0200
kamelet source/sink component #490
---
.../src/main/resources/application.properties | 2 +-
.../src/test/resources/routes/set-body.yaml | 2 +-
.../src/test/resources/routes/to-upper.yaml | 2 +-
.../apache/camel/component/kamelet/kamelet.json | 2 +-
.../apache/camel/component/kamelet/Kamelet.java | 97 ++++++++++++-
.../camel/component/kamelet/KameletComponent.java | 154 +++++++--------------
.../camel/component/kamelet/KameletEndpoint.java | 7 +-
.../camel/component/kamelet/KameletProducer.java | 2 +-
.../src/test/resources/log4j2-test.xml | 2 +
9 files changed, 151 insertions(+), 119 deletions(-)
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
index 3ce5493..5b8f41a 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/main/resources/application.properties
@@ -18,6 +18,6 @@
#
# Quarkus
#
-quarkus.log.console.enable = false
+quarkus.log.console.enable = true
quarkus.banner.enabled = false
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
index 55347e9..c311da1 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/set-body.yaml
@@ -16,7 +16,7 @@
#
- from:
- uri: "direct:{{routeId}}"
+ uri: "kamelet:source"
steps:
- set-body:
constant: "{{bodyValue}}"
\ No newline at end of file
diff --git a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
index 74105f1..ba51838 100644
--- a/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
+++ b/camel-k-quarkus/camel-k-quarkus-itests/camel-k-quarkus-itests-kamelet/src/test/resources/routes/to-upper.yaml
@@ -16,7 +16,7 @@
#
- from:
- uri: "direct:{{routeId}}"
+ uri: "kamelet:source"
steps:
- set-body:
constant: "{{message}}"
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index a66f1ca..4dd132f 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -6,7 +6,7 @@
"description": "The Kamelet Component provides support for interacting with Knative",
"deprecated": false,
"firstVersion": "3.5.0",
- "label": "camel-k",
+ "label": "core",
"javaType": "org.apache.camel.component.kamelet.KameletComponent",
"supportLevel": "Preview",
"groupId": "org.apache.camel.k",
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index 689c1ed..14ecc43 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,19 +17,32 @@
package org.apache.camel.component.kamelet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
+import java.util.StringJoiner;
import java.util.function.Predicate;
import org.apache.camel.CamelContext;
+import org.apache.camel.model.FromDefinition;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.RouteTemplateDefinition;
+import org.apache.camel.model.RouteTemplateParameterDefinition;
+import org.apache.camel.model.ToDefinition;
import org.apache.camel.spi.PropertiesComponent;
+import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.util.StringHelper;
+import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+
public final class Kamelet {
public static final String PROPERTIES_PREFIX = "camel.kamelet.";
public static final String SCHEME = "kamelet";
public static final String SOURCE_ID = "source";
public static final String SINK_ID = "sink";
+ public static final String PARAM_ROUTE_ID = "routeId";
+ public static final String PARAM_TEMPLATE_ID = "templateId";
private Kamelet() {
}
@@ -38,9 +51,14 @@ public final class Kamelet {
return item -> item.startsWith(prefix);
}
- public static String extractTemplateId(CamelContext context, String remaining) {
+ public static String extractTemplateId(CamelContext context, String remaining, Map<String, Object> parameters) {
+ Object param = parameters.get(PARAM_TEMPLATE_ID);
+ if (param != null) {
+ return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+ }
+
if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
- return context.resolvePropertyPlaceholders("{{templateId}}");
+ return context.resolvePropertyPlaceholders("{{" + PARAM_TEMPLATE_ID + "}}");
}
String answer = StringHelper.before(remaining, "/");
@@ -51,14 +69,19 @@ public final class Kamelet {
return answer;
}
- public static String extractRouteId(CamelContext context, String remaining) {
+ public static String extractRouteId(CamelContext context, String remaining, Map<String, Object> parameters) {
+ Object param = parameters.get(PARAM_ROUTE_ID);
+ if (param != null) {
+ return CamelContextHelper.mandatoryConvertTo(context, String.class, param);
+ }
+
if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
- return context.resolvePropertyPlaceholders("{{routeId}}");
+ return context.resolvePropertyPlaceholders("{{" + PARAM_ROUTE_ID + "}}");
}
String answer = StringHelper.after(remaining, "/");
if (answer == null) {
- answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
+ answer = extractTemplateId(context, remaining, parameters) + "-" + context.getUuidGenerator().generateUuid();
}
return answer;
@@ -84,4 +107,68 @@ public final class Kamelet {
return properties;
}
+
+ public static String addRouteFromTemplate(ModelCamelContext context, String routeId, String routeTemplateId, Map<String, Object> parameters) throws Exception {
+ RouteTemplateDefinition target = null;
+ for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
+ if (routeTemplateId.equals(def.getId())) {
+ target = def;
+ break;
+ }
+ }
+ if (target == null) {
+ throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
+ }
+
+ StringJoiner templatesBuilder = new StringJoiner(", ");
+ final Map<String, Object> prop = new HashMap<>();
+ // include default values first from the template (and validate that we have inputs for all required parameters)
+ if (target.getTemplateParameters() != null) {
+ for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
+ if (temp.getDefaultValue() != null) {
+ prop.put(temp.getName(), temp.getDefaultValue());
+ } else {
+ // this is a required parameter do we have that as input
+ if (!parameters.containsKey(temp.getName())) {
+ templatesBuilder.add(temp.getName());
+ }
+ }
+ }
+ }
+ if (templatesBuilder.length() > 0) {
+ throw new IllegalArgumentException(
+ "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
+ + templatesBuilder.toString());
+ }
+ // then override with user parameters
+ if (parameters != null) {
+ prop.putAll(parameters);
+ }
+
+ RouteDefinition def = target.asRouteDefinition();
+ // must make deep copy of input
+ def.setInput(null);
+ def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
+ if (routeId != null) {
+ def.setId(routeId);
+ }
+ // must make the source and sink endpoints are unique by appending the route id before we create the route from the template
+ if (def.getInput().getEndpointUri().startsWith("kamelet:source") || def.getInput().getEndpointUri().startsWith("kamelet//source")) {
+ def.getInput().setUri("kamelet:source?" + PARAM_ROUTE_ID + "=" + routeId);
+ }
+ Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
+ while (it.hasNext()) {
+ ToDefinition to = it.next();
+ if (to.getEndpointUri().startsWith("kamelet:sink") || to.getEndpointUri().startsWith("kamelet://sink")) {
+ to.setUri("kamelet:sink?" + PARAM_ROUTE_ID + "=" + routeId);
+ }
+ }
+
+ def.setTemplateParameters(prop);
+ context.removeRouteDefinition(def);
+ context.getRouteDefinitions().add(def);
+
+ return def.getId();
+ }
+
}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 1cc92b2..9696c90 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -17,12 +17,9 @@
package org.apache.camel.component.kamelet;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,14 +27,8 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.VetoCamelContextStartException;
-import org.apache.camel.model.EndpointRequiredDefinition;
-import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.model.RouteTemplateDefinition;
-import org.apache.camel.model.RouteTemplateParameterDefinition;
-import org.apache.camel.model.ToDefinition;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.annotations.Component;
@@ -47,8 +38,9 @@ import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import static org.apache.camel.model.ProcessorDefinitionHelper.filterTypeInOutputs;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
+import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate;
/**
* The Kamelet Component provides support for materializing routes templates.
@@ -71,23 +63,53 @@ public class KameletComponent extends DefaultComponent {
}
@Override
- public Endpoint createEndpoint(String uri) throws Exception {
- return super.createEndpoint(uri);
- }
-
- @Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
- final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
- final String newUri = "kamelet:" + templateId + "/" + routeId;
+ final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters);
+ final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters);
+
+ parameters.remove(PARAM_TEMPLATE_ID);
+ parameters.remove(PARAM_ROUTE_ID);
final KameletEndpoint endpoint;
- if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
- endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+ if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) {
+ //
+ // if remaining is either `source` or `sink' then it is a virtual
+ // endpoint that is used inside the kamelet definition to mark it
+ // as in/out endpoint.
+ //
+ // The following snippet defines a template which will act as a
+ // consumer for this Kamelet:
+ //
+ // from("kamelet:source")
+ // .to("log:info")
+ //
+ // The following snippet defines a template which will act as a
+ // producer for this Kamelet:
+ //
+ // from("telegram:bots")
+ // .to("kamelet:sink")
+ //
+ // Note that at the moment, there's no enforcement around `source`
+ // and `sink' to be defined on the right side (producer or consumer)
+ //
+ endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers);
+
+ // forward component properties
+ endpoint.setBlock(block);
+ endpoint.setTimeout(timeout);
+
+ // set endpoint specific properties
+ setProperties(endpoint, parameters);
+ } else {
+ endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) {
@Override
protected void doInit() throws Exception {
super.doInit();
+ //
+ // since this is the real kamelet, then we need to hand it
+ // over to the tracker.
+ //
lifecycleHandler.track(this);
}
};
@@ -110,20 +132,11 @@ public class KameletComponent extends DefaultComponent {
//
Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
kameletProperties.putAll(parameters);
- kameletProperties.put("templateId", templateId);
- kameletProperties.put("routeId", routeId);
+ kameletProperties.put(PARAM_TEMPLATE_ID, templateId);
+ kameletProperties.put(PARAM_ROUTE_ID, routeId);
// set kamelet specific properties
endpoint.setKameletProperties(kameletProperties);
- } else {
- endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
-
- // forward component properties
- endpoint.setBlock(block);
- endpoint.setTimeout(timeout);
-
- // set endpoint specific properties
- setProperties(endpoint, parameters);
}
return endpoint;
@@ -193,8 +206,6 @@ public class KameletComponent extends DefaultComponent {
@Override
public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
if (!this.initialized.compareAndExchange(false, true)) {
- ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
-
for (KameletEndpoint endpoint : endpoints) {
try {
createRouteForEndpoint(endpoint);
@@ -233,12 +244,14 @@ public class KameletComponent extends DefaultComponent {
if (!def.isPrepared()) {
// when starting the route that was created from the template
- // then we must provide the route id as local properties to the properties component
- // as this route id is used internal by kamelets when they are starting
+ // then we must provide the route id as local properties to the
+ // properties component as this route id is used internal by
+ // kamelets when they are starting
PropertiesComponent pc = context.getPropertiesComponent();
try {
Properties prop = new Properties();
- prop.put("routeId", id);
+ prop.put(PARAM_TEMPLATE_ID, endpoint.getTemplateId());
+ prop.put(PARAM_ROUTE_ID, id);
pc.setLocalProperties(prop);
context.startRouteDefinitions(List.of(def));
} finally {
@@ -248,72 +261,5 @@ public class KameletComponent extends DefaultComponent {
LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
}
-
- private static String addRouteFromTemplate(final ModelCamelContext context, final String routeId, final String routeTemplateId, final Map<String, Object> parameters)
- throws Exception {
- RouteTemplateDefinition target = null;
- for (RouteTemplateDefinition def : context.getRouteTemplateDefinitions()) {
- if (routeTemplateId.equals(def.getId())) {
- target = def;
- break;
- }
- }
- if (target == null) {
- throw new IllegalArgumentException("Cannot find RouteTemplate with id " + routeTemplateId);
- }
-
- StringJoiner templatesBuilder = new StringJoiner(", ");
- final Map<String, Object> prop = new HashMap();
- // include default values first from the template (and validate that we have inputs for all required parameters)
- if (target.getTemplateParameters() != null) {
- for (RouteTemplateParameterDefinition temp : target.getTemplateParameters()) {
- if (temp.getDefaultValue() != null) {
- prop.put(temp.getName(), temp.getDefaultValue());
- } else {
- // this is a required parameter do we have that as input
- if (!parameters.containsKey(temp.getName())) {
- templatesBuilder.add(temp.getName());
- }
- }
- }
- }
- if (templatesBuilder.length() > 0) {
- throw new IllegalArgumentException(
- "Route template " + routeTemplateId + " the following mandatory parameters must be provided: "
- + templatesBuilder.toString());
- }
- // then override with user parameters
- if (parameters != null) {
- prop.putAll(parameters);
- }
-
- RouteDefinition def = target.asRouteDefinition();
- // must make deep copy of input
- def.setInput(null);
- def.setInput(new FromDefinition(target.getRoute().getInput().getEndpointUri()));
- if (routeId != null) {
- def.setId(routeId);
- }
- // must make the source and simk endpoints are unique by appending the route id before we create the route from the template
- if (def.getInput().getEndpointUri().startsWith("kamelet:source")) {
- def.getInput().setUri("kamelet:source?routeId=" + routeId);
- }
- Iterator<ToDefinition> it = filterTypeInOutputs(def.getOutputs(), ToDefinition.class);
- while (it.hasNext()) {
- ToDefinition to = it.next();
- if (to.getEndpointUri().startsWith("kamelet:sink")) {
- // TODO: must make deep copy
- to.setUri("kamelet:sink?routeId=" + routeId);
- }
- }
-
-
- def.setTemplateParameters(prop);
- context.removeRouteDefinition(def);
- context.getRouteDefinitions().add(def);
-
- return def.getId();
- }
-
}
}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 2d6e883..c3760f3 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -30,8 +31,6 @@ import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@UriEndpoint(
firstVersion = "3.5.0",
@@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory;
syntax = "kamelet:templateId/routeId",
title = "Kamelet",
lenientProperties = true,
- label = "camel-k")
+ category = Category.CORE)
public class KameletEndpoint extends DefaultEndpoint {
- private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
-
@Metadata(required = true)
@UriPath(description = "The Route Template ID")
private final String templateId;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 9e6d86d..10bd42c 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -48,7 +48,7 @@ final class KameletProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- final KameletConsumer consumer = getEndpoint().getConsumer();;
+ final KameletConsumer consumer = getEndpoint().getConsumer();
if (consumer != null) {
return consumer.getAsyncProcessor().process(exchange, callback);
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index 8ce15f1..d5df1ad 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,7 +32,9 @@
<Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
<Root level="INFO">
+ <!--
<AppenderRef ref="STDOUT"/>
+ -->
<AppenderRef ref="FILE"/>
</Root>
</Loggers>