You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/07/30 09:15:05 UTC

[camel-k-runtime] 05/08: kamelets: create a camel-kamelet component #375

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelets-claus
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 2084696ce2da589a94d526721fdc4834da08ff10
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Jul 30 10:33:15 2020 +0200

    kamelets: create a camel-kamelet component #375
---
 .../camel/component/kamelet/KameletComponent.java  | 87 ++++++++++++++++++++++
 .../camel/component/kamelet/KameletEndpoint.java   | 40 +++-------
 .../kamelet/KameletAddAfterCamelStartedTest.java   | 76 +++++++++++++++++++
 3 files changed, 174 insertions(+), 29 deletions(-)

diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 05a8abc..6088410 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -16,13 +16,21 @@
  */
 package org.apache.camel.component.kamelet;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.StringHelper;
 
 @Component(Kamelet.SCHEME)
@@ -35,6 +43,9 @@ public class KameletComponent extends DefaultComponent {
         super(context);
     }
 
+    private volatile RouteTemplateEventNotifier notifier;
+    private final List<KameletEndpoint> endpoints = new ArrayList<>();
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
         final String templateId = extractTemplateId(remaining);
@@ -104,4 +115,80 @@ public class KameletComponent extends DefaultComponent {
 
         return properties;
     }
+
+    @Override
+    protected void doInit() throws Exception {
+        super.doInit();
+
+        if (!getCamelContext().isRunAllowed()) {
+            // setup event listener which must be started to get triggered during initialization of camel context
+            notifier = new RouteTemplateEventNotifier(this);
+            ServiceHelper.startService(notifier);
+            getCamelContext().getManagementStrategy().addEventNotifier(notifier);
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (notifier != null) {
+            ServiceHelper.stopService(notifier);
+            getCamelContext().getManagementStrategy().removeEventNotifier(notifier);
+            notifier = null;
+        }
+        super.doStop();
+    }
+
+    void onEndpointAdd(KameletEndpoint endpoint) {
+        if (notifier == null) {
+            try {
+                addRouteFromTemplate(endpoint);
+            } catch (Exception e) {
+                throw RuntimeCamelException.wrapRuntimeException(e);
+            }
+        } else {
+            // remember endpoints as we defer adding routes for them till later
+            this.endpoints.add(endpoint);
+        }
+    }
+
+    void addRouteFromTemplate(KameletEndpoint endpoint) throws Exception {
+        ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+        String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+        RouteDefinition def = context.getRouteDefinition(id);
+        if (!def.isPrepared()) {
+            List<RouteDefinition> list = new ArrayList<>(1);
+            list.add(def);
+            context.startRouteDefinitions(list);
+        }
+    }
+
+    private static class RouteTemplateEventNotifier extends EventNotifierSupport {
+
+        private final KameletComponent component;
+
+        public RouteTemplateEventNotifier(KameletComponent component) {
+            this.component = component;
+        }
+
+        @Override
+        public void notify(CamelEvent event) throws Exception {
+            for (KameletEndpoint endpoint : component.endpoints) {
+                component.addRouteFromTemplate(endpoint);
+            }
+            component.endpoints.clear();
+            // we were only needed during initializing/starting up camel, so remove after use
+            ServiceHelper.stopService(this);
+            component.getCamelContext().getManagementStrategy().removeEventNotifier(this);
+            component.notifier = null;
+        }
+
+        @Override
+        public boolean isEnabled(CamelEvent event) {
+            // we only care about this event during startup as its triggered when
+            // all route and route template definitions have been added and prepared
+            // so this allows us to hook into the right moment
+            return event instanceof CamelEvent.CamelContextInitializedEvent;
+        }
+
+    }
 }
diff --git a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 4300ea6..7609647 100644
--- a/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.kamelet;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.camel.AsyncCallback;
@@ -27,18 +25,12 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.event.CamelContextInitializedEvent;
-import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.spi.CamelEvent;
-import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.support.service.ServiceHelper;
 
 @UriEndpoint(
@@ -74,6 +66,11 @@ public class KameletEndpoint extends DefaultEndpoint {
         this.kameletUri = "direct:" + routeId;
     }
 
+    @Override
+    public KameletComponent getComponent() {
+        return (KameletComponent) super.getComponent();
+    }
+
     public String getTemplateId() {
         return templateId;
     }
@@ -82,6 +79,10 @@ public class KameletEndpoint extends DefaultEndpoint {
         return routeId;
     }
 
+    public Map<String, Object> getKameletProperties() {
+        return kameletProperties;
+    }
+
     @Override
     public Producer createProducer() throws Exception {
         return new KameletProducer();
@@ -97,27 +98,8 @@ public class KameletEndpoint extends DefaultEndpoint {
     @Override
     protected void doInit() throws Exception {
         super.doInit();
-
-        // TODO: lets find a nicer way to do this
-        EventNotifier notifier = new EventNotifierSupport() {
-            @Override
-            public void notify(CamelEvent event) throws Exception {
-                String id = getCamelContext().addRouteFromTemplate(routeId, templateId, kameletProperties);
-                List<RouteDefinition> list = new ArrayList<>(1);
-                list.add(getCamelContext().adapt(ModelCamelContext.class).getRouteDefinition(id));
-                getCamelContext().adapt(ModelCamelContext.class).startRouteDefinitions(list);
-                // no longer needed so we can remove ourselves
-                getCamelContext().getManagementStrategy().removeEventNotifier(this);
-            }
-
-            @Override
-            public boolean isEnabled(CamelEvent event) {
-                return event instanceof CamelContextInitializedEvent;
-            }
-        };
-
-        ServiceHelper.startService(notifier);
-        getCamelContext().getManagementStrategy().addEventNotifier(notifier);
+        // only need to add during init phase
+        getComponent().onEndpointAdd(this);
     }
 
     // *********************************
diff --git a/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
new file mode 100644
index 0000000..fdc9dc6
--- /dev/null
+++ b/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAddAfterCamelStartedTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import java.util.UUID;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class KameletAddAfterCamelStartedTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(KameletAddAfterCamelStartedTest.class);
+
+    @Test
+    public void test() throws Exception {
+        String body = UUID.randomUUID().toString();
+
+        CamelContext context = new DefaultCamelContext();
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                routeTemplate("setBody")
+                    .templateParameter("bodyValue")
+                    .from("direct:{{routeId}}")
+                    .setBody().constant("{{bodyValue}}");
+            }
+        });
+
+        /*
+        context.addRouteFromTemplate("setBody")
+            .routeId("test")
+            .parameter("routeId", "test")
+            .parameter("bodyValue", body)
+            .build();
+         */
+
+        // start camel here and add routes with kamelts later
+        context.start();
+
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // routes
+                from("direct:template")
+                    .toF("kamelet:setBody/test?bodyValue=%s", body)
+                    .to("log:1");
+            }
+        });
+
+        assertThat(
+            context.createFluentProducerTemplate().to("direct:template").withBody("test").request(String.class)
+        ).isEqualTo(body);
+
+        context.stop();
+    }
+}