You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/14 11:13:09 UTC
[camel-k-runtime] 02/11: kamelets: create a camel-kamelet component
#375
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit c7143ee0a7fcbe2b75ff52c014c7bf4cad8bb4cd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jul 29 16:54:00 2020 +0200
kamelets: create a camel-kamelet component #375
---
.../camel/component/kamelet/KameletComponent.java | 91 +++++++++++++++++++++-
.../camel/component/kamelet/KameletEndpoint.java | 58 +++++++-------
...t.java => KameletAddAfterCamelStartedTest.java} | 12 +--
.../camel/component/kamelet/KameletTest.java | 2 +-
4 files changed, 126 insertions(+), 37 deletions(-)
diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index f5c6f6e..83898a2 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -16,13 +16,21 @@
*/
package org.apache.camel.component.kamelet;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StringHelper;
@Component(Kamelet.SCHEME)
@@ -35,6 +43,11 @@ public class KameletComponent extends DefaultComponent {
super(context);
}
+ // use as temporary to keep track of created kamelet endpoints during startup as we need to defer
+ // create routes from templates until camel context has finished loading all routes and whatnot
+ private final List<KameletEndpoint> endpoints = new ArrayList<>();
+ private volatile RouteTemplateEventNotifier notifier;
+
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
final String templateId = extractTemplateId(remaining);
@@ -42,7 +55,7 @@ public class KameletComponent extends DefaultComponent {
//
// The properties for the kamelets are determined by global properties
- // and local endpoint parametes,
+ // and local endpoint parameters,
//
// Global parameters are loaded in the following order:
//
@@ -104,4 +117,80 @@ public class KameletComponent extends DefaultComponent {
return properties;
}
+
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+
+ if (!getCamelContext().isRunAllowed()) {
+ // setup event listener which must be started to get triggered during initialization of camel context
+ notifier = new RouteTemplateEventNotifier(this);
+ ServiceHelper.startService(notifier);
+ getCamelContext().getManagementStrategy().addEventNotifier(notifier);
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (notifier != null) {
+ ServiceHelper.stopService(notifier);
+ getCamelContext().getManagementStrategy().removeEventNotifier(notifier);
+ notifier = null;
+ }
+ super.doStop();
+ }
+
+ void onEndpointAdd(KameletEndpoint endpoint) {
+ if (notifier == null) {
+ try {
+ addRouteFromTemplate(endpoint);
+ } catch (Exception e) {
+ throw RuntimeCamelException.wrapRuntimeException(e);
+ }
+ } else {
+ // remember endpoints as we defer adding routes for them till later
+ this.endpoints.add(endpoint);
+ }
+ }
+
+ void addRouteFromTemplate(KameletEndpoint endpoint) throws Exception {
+ ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+ String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+ RouteDefinition def = context.getRouteDefinition(id);
+ if (!def.isPrepared()) {
+ List<RouteDefinition> list = new ArrayList<>(1);
+ list.add(def);
+ context.startRouteDefinitions(list);
+ }
+ }
+
+ private static class RouteTemplateEventNotifier extends EventNotifierSupport {
+
+ private final KameletComponent component;
+
+ public RouteTemplateEventNotifier(KameletComponent component) {
+ this.component = component;
+ }
+
+ @Override
+ public void notify(CamelEvent event) throws Exception {
+ for (KameletEndpoint endpoint : component.endpoints) {
+ component.addRouteFromTemplate(endpoint);
+ }
+ component.endpoints.clear();
+ // we were only needed during initializing/starting up camel, so remove after use
+ ServiceHelper.stopService(this);
+ component.getCamelContext().getManagementStrategy().removeEventNotifier(this);
+ component.notifier = null;
+ }
+
+ @Override
+ public boolean isEnabled(CamelEvent event) {
+ // we only care about this event during startup as its triggered when
+ // all route and route template definitions have been added and prepared
+ // so this allows us to hook into the right moment
+ return event instanceof CamelEvent.CamelContextInitializedEvent;
+ }
+
+ }
}
diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 22cd543..7609647 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.kamelet;
import java.util.Map;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -26,9 +28,9 @@ import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.DefaultProducer;
import org.apache.camel.support.service.ServiceHelper;
@UriEndpoint(
@@ -64,6 +66,11 @@ public class KameletEndpoint extends DefaultEndpoint {
this.kameletUri = "direct:" + routeId;
}
+ @Override
+ public KameletComponent getComponent() {
+ return (KameletComponent) super.getComponent();
+ }
+
public String getTemplateId() {
return templateId;
}
@@ -72,6 +79,10 @@ public class KameletEndpoint extends DefaultEndpoint {
return routeId;
}
+ public Map<String, Object> getKameletProperties() {
+ return kameletProperties;
+ }
+
@Override
public Producer createProducer() throws Exception {
return new KameletProducer();
@@ -81,21 +92,14 @@ public class KameletEndpoint extends DefaultEndpoint {
public Consumer createConsumer(Processor processor) throws Exception {
Consumer answer = new KemeletConsumer(processor);
configureConsumer(answer);
-
return answer;
}
@Override
- protected void doStart() throws Exception {
- try {
- // Add a route to the camel context from the given template
- // TODO: add validation (requires: https://issues.apache.org/jira/browse/CAMEL-15312)
- getCamelContext().addRouteFromTemplate(routeId, templateId, kameletProperties);
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
-
- super.doStart();
+ protected void doInit() throws Exception {
+ super.doInit();
+ // only need to add during init phase
+ getComponent().onEndpointAdd(this);
}
// *********************************
@@ -117,52 +121,46 @@ public class KameletEndpoint extends DefaultEndpoint {
endpoint = getCamelContext().getEndpoint(kameletUri);
consumer = endpoint.createConsumer(getProcessor());
- ServiceHelper.startService(endpoint);
- ServiceHelper.startService(consumer);
-
+ ServiceHelper.startService(endpoint, consumer);
super.doStart();
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(endpoint);
- ServiceHelper.stopService(consumer);
-
+ ServiceHelper.stopService(consumer, endpoint);
super.doStop();
}
}
- private class KameletProducer extends DefaultProducer {
+ private class KameletProducer extends DefaultAsyncProducer {
private volatile Endpoint endpoint;
- private volatile Producer producer;
+ private volatile AsyncProducer producer;
public KameletProducer() {
super(KameletEndpoint.this);
}
@Override
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
if (producer != null) {
- producer.process(exchange);
+ return producer.process(exchange, callback);
+ } else {
+ callback.done(true);
+ return true;
}
}
@Override
protected void doStart() throws Exception {
endpoint = getCamelContext().getEndpoint(kameletUri);
- producer = endpoint.createProducer();
-
- ServiceHelper.startService(endpoint);
- ServiceHelper.startService(producer);
-
+ producer = endpoint.createAsyncProducer();
+ ServiceHelper.startService(endpoint, producer);
super.doStart();
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(endpoint);
- ServiceHelper.stopService(producer);
-
+ ServiceHelper.stopService(producer, endpoint);
super.doStop();
}
}
diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
similarity index 91%
copy from camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
copy to camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
index 8f95084..fdc9dc6 100644
--- a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
+++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
@@ -25,10 +25,11 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import static org.assertj.core.api.Assertions.assertThat;
-public class KameletTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(KameletTest.class);
+public class KameletAddAfterCamelStartedTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KameletAddAfterCamelStartedTest.class);
@Test
public void test() throws Exception {
@@ -53,18 +54,19 @@ public class KameletTest {
.build();
*/
+ // start camel here and add routes with kamelts later
+ context.start();
+
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// routes
from("direct:template")
- .to("kamelet:setBody/test?bodyValue=bv")
+ .toF("kamelet:setBody/test?bodyValue=%s", body)
.to("log:1");
}
});
- context.start();
-
assertThat(
context.createFluentProducerTemplate().to("direct:template").withBody("test").request(String.class)
).isEqualTo(body);
diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
index 8f95084..32634a0 100644
--- a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
+++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
@@ -58,7 +58,7 @@ public class KameletTest {
public void configure() throws Exception {
// routes
from("direct:template")
- .to("kamelet:setBody/test?bodyValue=bv")
+ .toF("kamelet:setBody/test?bodyValue=%s", body)
.to("log:1");
}
});