You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/13 11:54:56 UTC
[camel] 04/04: CAMEL-16493: Kamelet EIP
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch kamelet-eip
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2a03c829aa84303f89beb906ed9e3966fb202e28
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Apr 13 13:53:46 2021 +0200
CAMEL-16493: Kamelet EIP
---
.../camel/component/kamelet/KameletComponent.java | 22 +++++++--------
.../camel/component/kamelet/KameletProcessor.java | 4 +--
.../camel/component/kamelet/KameletProducer.java | 31 +++++++++++++++++-----
.../component/kamelet/KameletAggregateTest.java | 3 ++-
.../camel/component/kamelet/KameletBasicTest.java | 2 --
.../component/kamelet/KameletConsumeOnlyTest.java | 2 --
.../component/kamelet/KameletEipAggregateTest.java | 2 --
7 files changed, 40 insertions(+), 26 deletions(-)
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7dbb94e..7f3aaeb 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -51,12 +51,12 @@ import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
public class KameletComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
- // active consumers
- private final Map<String, KameletConsumer> consumers = new HashMap<>();
private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
- // TODO:
- private final Map<String, Processor> callbacks = new ConcurrentHashMap<>();
+ // active consumers
+ private final Map<String, KameletConsumer> consumers = new HashMap<>();
+ // active kamelet EIPs
+ private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>();
// counter that is used for producers to keep track if any consumer was added/removed since they last checked
// this is used for optimization to avoid each producer to get consumer for each message processed
@@ -77,16 +77,16 @@ public class KameletComponent extends DefaultComponent {
public KameletComponent() {
}
- public void pushCallback(String key, Processor callback) {
- callbacks.put(key, callback);
+ public void addKameletEip(String key, Processor callback) {
+ kameletEips.put(key, callback);
}
- public Processor popCallback(String key) {
- return callbacks.remove(key);
+ public Processor removeKameletEip(String key) {
+ return kameletEips.remove(key);
}
- public Processor getCallback(String key) {
- return callbacks.get(key);
+ public Processor getKameletEip(String key) {
+ return kameletEips.get(key);
}
@Override
@@ -327,7 +327,7 @@ public class KameletComponent extends DefaultComponent {
ServiceHelper.stopAndShutdownService(consumers);
consumers.clear();
- callbacks.clear();
+ kameletEips.clear();
super.doShutdown();
}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
index 699d108..70e2f2d 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
@@ -124,7 +124,7 @@ public class KameletProcessor extends AsyncProcessorSupport
}
ServiceHelper.buildService(processor, producer);
- component.pushCallback(producer.getKey(), processor);
+ component.addKameletEip(producer.getKey(), processor);
}
@Override
@@ -146,6 +146,6 @@ public class KameletProcessor extends AsyncProcessorSupport
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownServices(processor, producer);
- component.popCallback(producer.getKey());
+ component.removeKameletEip(producer.getKey());
}
}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index d76937e..8762f85 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -19,7 +19,9 @@ package org.apache.camel.component.kamelet;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.Route;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,16 +83,33 @@ final class KameletProducer extends DefaultAsyncProducer {
callback.done(true);
return true;
} else {
+ // the kamelet producer has multiple purposes at this point
+ // it is capable of linking the kamelet component with the kamelet EIP
+ // to ensure the EIP and the component are wired together with their
+ // kamelet:source and kamelet:sink endpoints so when calling the sink
+ // then we continue processing the EIP child processors
+
+ // if no EIP is in use, then its _just_ a regular camel component
+ // with producer and consumers linked together via the component
+
if (sink) {
- // need to execute the callback from the waiting
- AsyncProcessor parked = (AsyncProcessor) component.getCallback(key);
- if (parked != null) {
- return parked.process(exchange, callback);
+ // when calling a kamelet:sink then lookup any waiting processor
+ // from the Kamelet EIP to continue routing
+ AsyncProcessor eip = (AsyncProcessor) component.getKameletEip(key);
+ if (eip != null) {
+ return eip.process(exchange, callback);
} else {
- callback.done(true);
- return true;
+ // if the current route is from a kamelet source then we should
+ // break out as otherwise we would end up calling ourselves again
+ Route route = ExchangeHelper.getRoute(exchange);
+ boolean source = route != null && route.getConsumer() instanceof KameletConsumer;
+ if (source) {
+ callback.done(true);
+ return true;
+ }
}
}
+ // kamelet producer that calls its kamelet consumer to process the incoming exchange
return consumer.getAsyncProcessor().process(exchange, callback);
}
} catch (Exception e) {
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
index cb27980..4f23055 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletAggregateTest.java
@@ -24,7 +24,7 @@ import org.apache.http.annotation.Obsolete;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-@Disabled("Should use Kamelet EIP")
+@Disabled
public class KameletAggregateTest extends CamelTestSupport {
@Test
@@ -62,6 +62,7 @@ public class KameletAggregateTest extends CamelTestSupport {
.end();
from("direct:start")
+ // this is not possible, you must use kamelet EIP instead
.to("kamelet:my-aggregate?count=5")
.to("log:info")
.to("mock:result");
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
index 52f1239..0920161 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
@@ -22,7 +22,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -38,7 +37,6 @@ public class KameletBasicTest extends CamelTestSupport {
}
@Test
- @Disabled
public void canConsumeFromKamelet() {
assertThat(
consumer.receiveBody("kamelet:tick", Integer.class)).isEqualTo(1);
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
index 15c5d2c..065e7bb 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletConsumeOnlyTest.java
@@ -20,12 +20,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-@Disabled
public class KameletConsumeOnlyTest extends CamelTestSupport {
@Test
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
index 2aa911e..ae1eb8f 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletEipAggregateTest.java
@@ -20,7 +20,6 @@ import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.test.junit5.CamelTestSupport;
-import org.apache.http.annotation.Obsolete;
import org.junit.jupiter.api.Test;
public class KameletEipAggregateTest extends CamelTestSupport {
@@ -44,7 +43,6 @@ public class KameletEipAggregateTest extends CamelTestSupport {
//
// **********************************************
- @Obsolete
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override