You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/14 11:13:09 UTC

[camel-k-runtime] 02/11: kamelets: create a camel-kamelet component #375

This is an automated email from the ASF dual-hosted git repository.

lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit c7143ee0a7fcbe2b75ff52c014c7bf4cad8bb4cd
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Jul 29 16:54:00 2020 +0200

    kamelets: create a camel-kamelet component #375
---
 .../camel/component/kamelet/KameletComponent.java  | 91 +++++++++++++++++++++-
 .../camel/component/kamelet/KameletEndpoint.java   | 58 +++++++-------
 ...t.java => KameletAddAfterCamelStartedTest.java} | 12 +--
 .../camel/component/kamelet/KameletTest.java       |  2 +-
 4 files changed, 126 insertions(+), 37 deletions(-)

diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index f5c6f6e..83898a2 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -16,13 +16,21 @@
  */
 package org.apache.camel.component.kamelet;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.StringHelper;
 
 @Component(Kamelet.SCHEME)
@@ -35,6 +43,11 @@ public class KameletComponent extends DefaultComponent {
         super(context);
     }
 
+    // use as temporary to keep track of created kamelet endpoints during startup as we need to defer
+    // create routes from templates until camel context has finished loading all routes and whatnot
+    private final List<KameletEndpoint> endpoints = new ArrayList<>();
+    private volatile RouteTemplateEventNotifier notifier;
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = extractTemplateId(remaining);
@@ -42,7 +55,7 @@ public class KameletComponent extends DefaultComponent {
 
         //
         // The properties for the kamelets are determined by global properties
-        // and local endpoint parametes,
+        // and local endpoint parameters,
         //
         // Global parameters are loaded in the following order:
         //
@@ -104,4 +117,80 @@ public class KameletComponent extends DefaultComponent {
 
         return properties;
     }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        if (!getCamelContext().isRunAllowed()) {
+            // setup event listener which must be started to get triggered during initialization of camel context
+            notifier = new RouteTemplateEventNotifier(this);
+            ServiceHelper.startService(notifier);
+            getCamelContext().getManagementStrategy().addEventNotifier(notifier);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (notifier != null) {
+            ServiceHelper.stopService(notifier);
+            getCamelContext().getManagementStrategy().removeEventNotifier(notifier);
+            notifier = null;
+        }
+        super.doStop();
+    }
+
+    void onEndpointAdd(KameletEndpoint endpoint) {
+        if (notifier == null) {
+            try {
+                addRouteFromTemplate(endpoint);
+            } catch (Exception e) {
+                throw RuntimeCamelException.wrapRuntimeException(e);
+            }
+        } else {
+            // remember endpoints as we defer adding routes for them till later
+            this.endpoints.add(endpoint);
+        }
+    }
+
+    void addRouteFromTemplate(KameletEndpoint endpoint) throws Exception {
+        ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+        String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+        RouteDefinition def = context.getRouteDefinition(id);
+        if (!def.isPrepared()) {
+            List<RouteDefinition> list = new ArrayList<>(1);
+            list.add(def);
+            context.startRouteDefinitions(list);
+        }
+    }
+
+    private static class RouteTemplateEventNotifier extends EventNotifierSupport {
+
+        private final KameletComponent component;
+
+        public RouteTemplateEventNotifier(KameletComponent component) {
+            this.component = component;
+        }
+
+        @Override
+        public void notify(CamelEvent event) throws Exception {
+            for (KameletEndpoint endpoint : component.endpoints) {
+                component.addRouteFromTemplate(endpoint);
+            }
+            component.endpoints.clear();
+            // we were only needed during initializing/starting up camel, so remove after use
+            ServiceHelper.stopService(this);
+            component.getCamelContext().getManagementStrategy().removeEventNotifier(this);
+            component.notifier = null;
+        }
+
+        @Override
+        public boolean isEnabled(CamelEvent event) {
+            // we only care about this event during startup as its triggered when
+            // all route and route template definitions have been added and prepared
+            // so this allows us to hook into the right moment
+            return event instanceof CamelEvent.CamelContextInitializedEvent;
+        }
+
+    }
 }
diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 22cd543..7609647 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.component.kamelet;
 
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -26,9 +28,9 @@ import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.support.service.ServiceHelper;
 
 @UriEndpoint(
@@ -64,6 +66,11 @@ public class KameletEndpoint extends DefaultEndpoint {
         this.kameletUri = "direct:" + routeId;
     }
 
+    @Override
+    public KameletComponent getComponent() {
+        return (KameletComponent) super.getComponent();
+    }
+
     public String getTemplateId() {
         return templateId;
     }
@@ -72,6 +79,10 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    public Map<String, Object> getKameletProperties() {
+        return kameletProperties;
+    }
+
     @Override
     public Producer createProducer() throws Exception {
         return new KameletProducer();
@@ -81,21 +92,14 @@ public class KameletEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         Consumer answer = new KemeletConsumer(processor);
         configureConsumer(answer);
-
         return answer;
     }
 
     @Override
-    protected void doStart() throws Exception {
-        try {
-            // Add a route to the camel context from the given template
-            // TODO: add validation (requires: https://issues.apache.org/jira/browse/CAMEL-15312)
-            getCamelContext().addRouteFromTemplate(routeId, templateId, kameletProperties);
-        } catch (Exception e) {
-            throw new IllegalArgumentException(e);
-        }
-
-        super.doStart();
+    protected void doInit() throws Exception {
+        super.doInit();
+        // only need to add during init phase
+        getComponent().onEndpointAdd(this);
     }
 
     // *********************************
@@ -117,52 +121,46 @@ public class KameletEndpoint extends DefaultEndpoint {
             endpoint = getCamelContext().getEndpoint(kameletUri);
             consumer = endpoint.createConsumer(getProcessor());
 
-            ServiceHelper.startService(endpoint);
-            ServiceHelper.startService(consumer);
-
+            ServiceHelper.startService(endpoint, consumer);
             super.doStart();
         }
 
         @Override
         protected void doStop() throws Exception {
-            ServiceHelper.stopService(endpoint);
-            ServiceHelper.stopService(consumer);
-
+            ServiceHelper.stopService(consumer, endpoint);
             super.doStop();
         }
     }
 
-    private class KameletProducer extends DefaultProducer {
+    private class KameletProducer extends DefaultAsyncProducer {
         private volatile Endpoint endpoint;
-        private volatile Producer producer;
+        private volatile AsyncProducer producer;
 
         public KameletProducer() {
             super(KameletEndpoint.this);
         }
 
         @Override
-        public void process(Exchange exchange) throws Exception {
+        public boolean process(Exchange exchange, AsyncCallback callback) {
             if (producer != null) {
-                producer.process(exchange);
+                return producer.process(exchange, callback);
+            } else {
+                callback.done(true);
+                return true;
             }
         }
 
         @Override
         protected void doStart() throws Exception {
             endpoint = getCamelContext().getEndpoint(kameletUri);
-            producer = endpoint.createProducer();
-
-            ServiceHelper.startService(endpoint);
-            ServiceHelper.startService(producer);
-
+            producer = endpoint.createAsyncProducer();
+            ServiceHelper.startService(endpoint, producer);
             super.doStart();
         }
 
         @Override
         protected void doStop() throws Exception {
-            ServiceHelper.stopService(endpoint);
-            ServiceHelper.stopService(producer);
-
+            ServiceHelper.stopService(producer, endpoint);
             super.doStop();
         }
     }
diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
similarity index 91%
copy from camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
copy to camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
index 8f95084..fdc9dc6 100644
--- a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
+++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
@@ -25,10 +25,11 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 import static org.assertj.core.api.Assertions.assertThat;
 
-public class KameletTest {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KameletTest.class);
+public class KameletAddAfterCamelStartedTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletAddAfterCamelStartedTest.class);
 
     @Test
     public void test() throws Exception {
@@ -53,18 +54,19 @@ public class KameletTest {
             .build();
          */
 
+        // start camel here and add routes with kamelts later
+        context.start();
+
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 // routes
                 from("direct:template")
-                    .to("kamelet:setBody/test?bodyValue=bv")
+                    .toF("kamelet:setBody/test?bodyValue=%s", body)
                     .to("log:1");
             }
         });
 
-        context.start();
-
         assertThat(
             context.createFluentProducerTemplate().to("direct:template").withBody("test").request(String.class)
         ).isEqualTo(body);
diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
index 8f95084..32634a0 100644
--- a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
+++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTest.java
@@ -58,7 +58,7 @@ public class KameletTest {
             public void configure() throws Exception {
                 // routes
                 from("direct:template")
-                    .to("kamelet:setBody/test?bodyValue=bv")
+                    .toF("kamelet:setBody/test?bodyValue=%s", body)
                     .to("log:1");
             }
         });