You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/13 11:54:56 UTC

[camel] 04/04: CAMEL-16493: Kamelet EIP

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelet-eip
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2a03c829aa84303f89beb906ed9e3966fb202e28
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 13 13:53:46 2021 +0200

    CAMEL-16493: Kamelet EIP
---
 .../camel/component/kamelet/KameletComponent.java  | 22 +++++++--------
 .../camel/component/kamelet/KameletProcessor.java  |  4 +--
 .../camel/component/kamelet/KameletProducer.java   | 31 +++++++++++++++++-----
 .../component/kamelet/KameletAggregateTest.java    |  3 ++-
 .../camel/component/kamelet/KameletBasicTest.java  |  2 --
 .../component/kamelet/KameletConsumeOnlyTest.java  |  2 --
 .../component/kamelet/KameletEipAggregateTest.java |  2 --
 7 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7dbb94e..7f3aaeb 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -51,12 +51,12 @@ import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
 public class KameletComponent extends DefaultComponent {
     private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
 
-    // active consumers
-    private final Map<String, KameletConsumer> consumers = new HashMap<>();
     private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
 
-    // TODO:
-    private final Map<String, Processor> callbacks = new ConcurrentHashMap<>();
+    // active consumers
+    private final Map<String, KameletConsumer> consumers = new HashMap<>();
+    // active kamelet EIPs
+    private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>();
 
     // counter that is used for producers to keep track if any consumer was added/removed since they last checked
     // this is used for optimization to avoid each producer to get consumer for each message processed
@@ -77,16 +77,16 @@ public class KameletComponent extends DefaultComponent {
     public KameletComponent() {
     }
 
-    public void pushCallback(String key, Processor callback) {
-        callbacks.put(key, callback);
+    public void addKameletEip(String key, Processor callback) {
+        kameletEips.put(key, callback);
     }
 
-    public Processor popCallback(String key) {
-        return callbacks.remove(key);
+    public Processor removeKameletEip(String key) {
+        return kameletEips.remove(key);
     }
 
-    public Processor getCallback(String key) {
-        return callbacks.get(key);
+    public Processor getKameletEip(String key) {
+        return kameletEips.get(key);
     }
 
     @Override
@@ -327,7 +327,7 @@ public class KameletComponent extends DefaultComponent {
 
         ServiceHelper.stopAndShutdownService(consumers);
         consumers.clear();
-        callbacks.clear();
+        kameletEips.clear();
         super.doShutdown();
     }
 
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
index 699d108..70e2f2d 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
@@ -124,7 +124,7 @@ public class KameletProcessor extends AsyncProcessorSupport
         }
         ServiceHelper.buildService(processor, producer);
 
-        component.pushCallback(producer.getKey(), processor);
+        component.addKameletEip(producer.getKey(), processor);
     }
 
     @Override
@@ -146,6 +146,6 @@ public class KameletProcessor extends AsyncProcessorSupport
     protected void doShutdown() throws Exception {
         ServiceHelper.stopAndShutdownServices(processor, producer);
 
-        component.popCallback(producer.getKey());
+        component.removeKameletEip(producer.getKey());
     }
 }
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index d76937e..8762f85 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -19,7 +19,9 @@ package org.apache.camel.component.kamelet;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Route;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,16 +83,33 @@ final class KameletProducer extends DefaultAsyncProducer {
                 callback.done(true);
                 return true;
             } else {
+                // the kamelet producer has multiple purposes at this point
+                // it is capable of linking the kamelet component with the kamelet EIP
+                // to ensure the EIP and the component are wired together with their
+                // kamelet:source and kamelet:sink endpoints so when calling the sink
+                // then we continue processing the EIP child processors
+
+                // if no EIP is in use, then its _just_ a regular camel component
+                // with producer and consumers linked together via the component
+
                 if (sink) {
-                    // need to execute the callback from the waiting
-                    AsyncProcessor parked = (AsyncProcessor) component.getCallback(key);
-                    if (parked != null) {
-                        return parked.process(exchange, callback);
+                    // when calling a kamelet:sink then lookup any waiting processor
+                    // from the Kamelet EIP to continue routing
+                    AsyncProcessor eip = (AsyncProcessor) component.getKameletEip(key);
+                    if (eip != null) {
+                        return eip.process(exchange, callback);
                     } else {
-                        callback.done(true);
-                        return true;
+                        // if the current route is from a kamelet source then we should
+                        // break out as otherwise we would end up calling ourselves again
+                        Route route = ExchangeHelper.getRoute(exchange);
+                        boolean source = route != null && route.getConsumer() instanceof KameletConsumer;
+                        if (source) {
+                            callback.done(true);
+                            return true;
+                        }
                     }
                 }
+                // kamelet producer that calls its kamelet consumer to process the incoming exchange
                 return consumer.getAsyncProcessor().process(exchange, callback);
             }
         } catch (Exception e) {
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
index cb27980..4f23055 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
@@ -24,7 +24,7 @@ import org.apache.http.annotation.Obsolete;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-@Disabled("Should use Kamelet EIP")
+@Disabled
 public class KameletAggregateTest extends CamelTestSupport {
 
     @Test
@@ -62,6 +62,7 @@ public class KameletAggregateTest extends CamelTestSupport {
                     .end();
 
                 from("direct:start")
+                        // this is not possible, you must use kamelet EIP instead
                         .to("kamelet:my-aggregate?count=5")
                         .to("log:info")
                         .to("mock:result");
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
index 52f1239..0920161 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
@@ -22,7 +22,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -38,7 +37,6 @@ public class KameletBasicTest extends CamelTestSupport {
     }
 
     @Test
-    @Disabled
     public void canConsumeFromKamelet() {
         assertThat(
                 consumer.receiveBody("kamelet:tick", Integer.class)).isEqualTo(1);
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
index 15c5d2c..065e7bb 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
@@ -20,12 +20,10 @@ import org.apache.camel.Exchange;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-@Disabled
 public class KameletConsumeOnlyTest extends CamelTestSupport {
 
     @Test
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
index 2aa911e..ae1eb8f 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
@@ -20,7 +20,6 @@ import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.http.annotation.Obsolete;
 import org.junit.jupiter.api.Test;
 
 public class KameletEipAggregateTest extends CamelTestSupport {
@@ -44,7 +43,6 @@ public class KameletEipAggregateTest extends CamelTestSupport {
     //
     // **********************************************
 
-    @Obsolete
     protected RoutesBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             @Override