You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/09/30 09:20:18 UTC
[camel-k-runtime] 01/03: kamelet source/sink component #490
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 4dede2ab1d61551b675b6bde82dae236511ac07e
Author: Luca Burgazzoli <lb...@gmail.com>
AuthorDate: Mon Sep 28 11:56:22 2020 +0200
kamelet source/sink component #490
---
components/camel-kamelet/pom.xml | 9 +-
.../apache/camel/component/kamelet/Kamelet.java | 32 ++---
.../camel/component/kamelet/KameletComponent.java | 143 ++++++++++++++-----
.../camel/component/kamelet/KameletConsumer.java | 74 ++++++++++
.../camel/component/kamelet/KameletEndpoint.java | 157 ++++++++++++---------
.../camel/component/kamelet/KameletProducer.java | 70 +++++++++
.../camel/component/kamelet/KameletBasicTest.java | 4 +-
.../component/kamelet/KameletPropertiesTest.java | 2 +-
.../camel/component/kamelet/KameletRouteTest.java | 23 +--
.../component/kamelet/KameletValidationTest.java | 2 +-
.../src/test/resources/log4j2-test.xml | 2 -
11 files changed, 361 insertions(+), 157 deletions(-)
diff --git a/components/camel-kamelet/pom.xml b/components/camel-kamelet/pom.xml
index 21232ac..426c1f7 100644
--- a/components/camel-kamelet/pom.xml
+++ b/components/camel-kamelet/pom.xml
@@ -40,10 +40,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-core-engine</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.camel</groupId>
- <artifactId>camel-direct</artifactId>
- </dependency>
<!-- ****************************** -->
<!-- -->
@@ -78,6 +74,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-direct</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
<scope>test</scope>
</dependency>
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index d9ac81b..689c1ed 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -17,24 +17,19 @@
package org.apache.camel.component.kamelet;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import org.apache.camel.CamelContext;
-import org.apache.camel.model.ModelCamelContext;
-import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public final class Kamelet {
public static final String PROPERTIES_PREFIX = "camel.kamelet.";
public static final String SCHEME = "kamelet";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(Kamelet.class);
+ public static final String SOURCE_ID = "source";
+ public static final String SINK_ID = "sink";
private Kamelet() {
}
@@ -43,20 +38,11 @@ public final class Kamelet {
return item -> item.startsWith(prefix);
}
- public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
- LOGGER.debug("Creating route from template {}", endpoint.getTemplateId());
-
- ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
- String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
- RouteDefinition def = context.getRouteDefinition(id);
- if (!def.isPrepared()) {
- context.startRouteDefinitions(List.of(def));
+ public static String extractTemplateId(CamelContext context, String remaining) {
+ if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+ return context.resolvePropertyPlaceholders("{{templateId}}");
}
- LOGGER.debug("Route {} created from template {}", id, endpoint.getTemplateId());
- }
-
- public static String extractTemplateId(CamelContext context, String remaining) {
String answer = StringHelper.before(remaining, "/");
if (answer == null) {
answer = remaining;
@@ -65,7 +51,11 @@ public final class Kamelet {
return answer;
}
- public static String extractRouteId(CamelContext context, String remaining) {
+ public static String extractRouteId(CamelContext context, String remaining) {
+ if (SOURCE_ID.equals(remaining) || SINK_ID.equals(remaining)) {
+ return context.resolvePropertyPlaceholders("{{routeId}}");
+ }
+
String answer = StringHelper.after(remaining, "/");
if (answer == null) {
answer = extractTemplateId(context, remaining) + "-" + context.getUuidGenerator().generateUuid();
@@ -74,7 +64,7 @@ public final class Kamelet {
return answer;
}
- public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
+ public static Map<String, Object> extractKameletProperties(CamelContext context, String... elements) {
PropertiesComponent pc = context.getPropertiesComponent();
Map<String, Object> properties = new HashMap<>();
String prefix = Kamelet.PROPERTIES_PREFIX;
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7a97d9c..91a2514 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -19,64 +19,120 @@ package org.apache.camel.component.kamelet;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.VetoCamelContextStartException;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.LifecycleStrategySupport;
+import org.apache.camel.support.service.ServiceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * The Kamelet Component provides support for interacting with <a href="https://knative.dev">Knative</a>.
+ * The Kamelet Component provides support for materializing routes templates.
*/
@Component(Kamelet.SCHEME)
public class KameletComponent extends DefaultComponent {
- private final LifecycleHandler lifecycleHandler;
+ private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
- public KameletComponent() {
- this(null);
- }
+ private final Map<String, KameletConsumer> consumers;
+ private final LifecycleHandler lifecycleHandler;
- public KameletComponent(CamelContext context) {
- super(context);
+ @Metadata(label = "producer", defaultValue = "true")
+ private boolean block = true;
+ @Metadata(label = "producer", defaultValue = "30000")
+ private long timeout = 30000L;
+ public KameletComponent() {
this.lifecycleHandler = new LifecycleHandler();
+ this.consumers = new ConcurrentHashMap<>();
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining);
final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining);
+ final String newUri = "kamelet:" + templateId + "/" + routeId;
+
+ final KameletEndpoint endpoint;
- //
- // The properties for the kamelets are determined by global properties
- // and local endpoint parameters,
- //
- // Global parameters are loaded in the following order:
- //
- // camel.kamelet." + templateId
- // camel.kamelet." + templateId + "." routeId
- //
- Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
- kameletProperties.putAll(parameters);
- kameletProperties.put("templateId", templateId);
- kameletProperties.put("routeId", routeId);
-
- // Remaining parameter should be related to the route and to avoid the
- // parameters validation to fail, we need to clear the parameters map.
- parameters.clear();
-
- KameletEndpoint endpoint = new KameletEndpoint(uri, this, templateId, routeId, kameletProperties);
-
- // No parameters are expected here.
- setProperties(endpoint, parameters);
+ if (!Kamelet.SOURCE_ID.equals(remaining) && !Kamelet.SINK_ID.equals(remaining)) {
+ endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers) {
+ @Override
+ protected void doInit() throws Exception {
+ super.doInit();
+ lifecycleHandler.track(this);
+ }
+ };
+
+ // forward component properties
+ endpoint.setBlock(block);
+ endpoint.setTimeout(timeout);
+
+ // set endpoint specific properties
+ setProperties(endpoint, parameters);
+
+ //
+ // The properties for the kamelets are determined by global properties
+ // and local endpoint parameters,
+ //
+ // Global parameters are loaded in the following order:
+ //
+ // camel.kamelet." + templateId
+ // camel.kamelet." + templateId + "." routeId
+ //
+ Map<String, Object> kameletProperties = Kamelet.extractKameletProperties(getCamelContext(), templateId, routeId);
+ kameletProperties.putAll(parameters);
+ kameletProperties.put("templateId", templateId);
+ kameletProperties.put("routeId", routeId);
+
+ // set kamelet specific properties
+ endpoint.setKameletProperties(kameletProperties);
+ } else {
+ endpoint = new KameletEndpoint(newUri, this, templateId, routeId, consumers);
+
+ // forward component properties
+ endpoint.setBlock(block);
+ endpoint.setTimeout(timeout);
+
+ // set endpoint specific properties
+ setProperties(endpoint, parameters);
+ }
return endpoint;
}
+ public boolean isBlock() {
+ return block;
+ }
+
+ /**
+ * If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block
+ * and wait for the consumer to become active.
+ */
+ public void setBlock(boolean block) {
+ this.block = block;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * The timeout value to use if block is enabled.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
@Override
protected void doInit() throws Exception {
getCamelContext().addLifecycleStrategy(lifecycleHandler);
@@ -91,11 +147,11 @@ public class KameletComponent extends DefaultComponent {
@Override
protected void doStop() throws Exception {
getCamelContext().getLifecycleStrategies().remove(lifecycleHandler);
- super.doStop();
- }
- void onEndpointAdd(KameletEndpoint endpoint) {
- lifecycleHandler.track(endpoint);
+ ServiceHelper.stopService(consumers.values());
+ consumers.clear();
+
+ super.doStop();
}
/*
@@ -118,9 +174,11 @@ public class KameletComponent extends DefaultComponent {
@Override
public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
if (!this.initialized.compareAndExchange(false, true)) {
+ ModelCamelContext mcc = context.adapt(ModelCamelContext.class);
+
for (KameletEndpoint endpoint : endpoints) {
try {
- Kamelet.createRouteForEndpoint(endpoint);
+ createRouteForEndpoint(endpoint);
} catch (Exception e) {
throw new VetoCamelContextStartException("Failure creating route from template: " + endpoint.getTemplateId(), e, context);
}
@@ -137,13 +195,28 @@ public class KameletComponent extends DefaultComponent {
public void track(KameletEndpoint endpoint) {
if (this.initialized.get()) {
try {
- Kamelet.createRouteForEndpoint(endpoint);
+ createRouteForEndpoint(endpoint);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
}
} else {
+ LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
this.endpoints.add(endpoint);
}
}
+
+ public static void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
+ LOGGER.debug("Creating route from template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());
+
+ final ModelCamelContext context = endpoint.getCamelContext().adapt(ModelCamelContext.class);
+ final String id = context.addRouteFromTemplate(endpoint.getRouteId(), endpoint.getTemplateId(), endpoint.getKameletProperties());
+ final RouteDefinition def = context.getRouteDefinition(id);
+
+ if (!def.isPrepared()) {
+ context.startRouteDefinitions(List.of(def));
+ }
+
+ LOGGER.debug("Route with id={} created from template={}", id, endpoint.getTemplateId());
+ }
}
}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
new file mode 100644
index 0000000..c99d56c
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.Suspendable;
+import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.support.DefaultConsumer;
+
+final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
+ public KameletConsumer(KameletEndpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+ }
+
+ @Override
+ public KameletEndpoint getEndpoint() {
+ return (KameletEndpoint)super.getEndpoint();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ getEndpoint().addConsumer(this);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ getEndpoint().removeConsumer(this);
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ getEndpoint().removeConsumer(this);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ getEndpoint().addConsumer(this);
+ }
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ // deny stopping on shutdown as we want kamelet consumers to run in
+ // case some other queues depend on this consumer to run, so it can
+ // complete its exchanges
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ // return 0 as we do not have an internal memory queue with a variable
+ // size of inflight messages.
+ return 0;
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // noop
+ }
+}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index 3209c6d..80d8e10 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -17,23 +17,21 @@
package org.apache.camel.component.kamelet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProducer;
import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-import org.apache.camel.support.DefaultAsyncProducer;
-import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@UriEndpoint(
firstVersion = "3.5.0",
@@ -43,34 +41,67 @@ import org.apache.camel.util.ObjectHelper;
lenientProperties = true,
label = "camel-k")
public class KameletEndpoint extends DefaultEndpoint {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KameletEndpoint.class);
+
@Metadata(required = true)
@UriPath(description = "The Route Template ID")
private final String templateId;
-
@Metadata(required = false)
@UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided")
private final String routeId;
+ @UriParam(label = "producer", defaultValue = "true")
+ private boolean block = true;
+ @UriParam(label = "producer", defaultValue = "30000")
+ private long timeout = 30000L;
+ @UriParam(label = "producer", defaultValue = "true")
+
private final Map<String, Object> kameletProperties;
- private final String kameletUri;
+ private final Map<String, KameletConsumer> consumers;
+ private final String key;
public KameletEndpoint(
String uri,
KameletComponent component,
String templateId,
String routeId,
- Map<String, Object> kameletProperties) {
+ Map<String, KameletConsumer> consumers) {
super(uri, component);
ObjectHelper.notNull(templateId, "template id");
ObjectHelper.notNull(routeId, "route id");
- ObjectHelper.notNull(kameletProperties, "kamelet properties");
this.templateId = templateId;
this.routeId = routeId;
- this.kameletProperties = Collections.unmodifiableMap(kameletProperties);
- this.kameletUri = "direct:" + routeId;
+ this.key = templateId + "/" + routeId;
+ this.kameletProperties = new HashMap<>();
+ this.consumers = consumers;
+ }
+
+ public boolean isBlock() {
+ return block;
+ }
+
+ /**
+ * If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block
+ * and wait for the consumer to become active.
+ */
+ public void setBlock(boolean block) {
+ this.block = block;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * The timeout value to use if block is enabled.
+ *
+ * @param timeout the timeout value
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
}
@Override
@@ -83,6 +114,11 @@ public class KameletEndpoint extends DefaultEndpoint {
return true;
}
+ @Override
+ public boolean isSingleton() {
+ return true;
+ }
+
public String getTemplateId() {
return templateId;
}
@@ -91,88 +127,71 @@ public class KameletEndpoint extends DefaultEndpoint {
return routeId;
}
+ public void setKameletProperties(Map<String, Object> kameletProperties) {
+ if (kameletProperties != null) {
+ this.kameletProperties.clear();
+ this.kameletProperties.putAll(kameletProperties);
+ }
+ }
+
public Map<String, Object> getKameletProperties() {
- return kameletProperties;
+ return Collections.unmodifiableMap(kameletProperties);
}
@Override
public Producer createProducer() throws Exception {
- return new KameletProducer();
+ return new KameletProducer(this);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Consumer answer = new KemeletConsumer(processor);
+ Consumer answer = new KameletConsumer(this, processor);
configureConsumer(answer);
return answer;
}
- @Override
- protected void doInit() throws Exception {
- super.doInit();
- getComponent().onEndpointAdd(this);
- }
-
// *********************************
//
// Helpers
//
// *********************************
- private class KemeletConsumer extends DefaultConsumer {
- private volatile Endpoint endpoint;
- private volatile Consumer consumer;
-
- public KemeletConsumer(Processor processor) {
- super(KameletEndpoint.this, processor);
- }
-
- @Override
- protected void doStart() throws Exception {
- endpoint = getCamelContext().getEndpoint(kameletUri);
- consumer = endpoint.createConsumer(getProcessor());
-
- ServiceHelper.startService(endpoint, consumer);
- super.doStart();
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(consumer, endpoint);
- super.doStop();
+ void addConsumer(KameletConsumer consumer) {
+ synchronized (consumers) {
+ if (consumers.putIfAbsent(key, consumer) != null) {
+ throw new IllegalArgumentException(
+ "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
+ }
+ consumers.notifyAll();
}
}
- private class KameletProducer extends DefaultAsyncProducer {
- private volatile Endpoint endpoint;
- private volatile AsyncProducer producer;
-
- public KameletProducer() {
- super(KameletEndpoint.this);
+ void removeConsumer(KameletConsumer consumer) {
+ synchronized (consumers) {
+ consumers.remove(key, consumer);
+ consumers.notifyAll();
}
+ }
- @Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
- if (producer != null) {
- return producer.process(exchange, callback);
- } else {
- callback.done(true);
- return true;
+ KameletConsumer getConsumer() throws InterruptedException {
+ synchronized (consumers) {
+ KameletConsumer answer = consumers.get(key);
+ if (answer == null && block) {
+ StopWatch watch = new StopWatch();
+ for (; ; ) {
+ answer =consumers.get(key);
+ if (answer != null) {
+ break;
+ }
+ long rem = timeout - watch.taken();
+ if (rem <= 0) {
+ break;
+ }
+ consumers.wait(rem);
+ }
}
- }
-
- @Override
- protected void doStart() throws Exception {
- endpoint = getCamelContext().getEndpoint(kameletUri);
- producer = endpoint.createAsyncProducer();
- ServiceHelper.startService(endpoint, producer);
- super.doStart();
- }
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(producer, endpoint);
- super.doStop();
+ return answer;
}
}
}
\ No newline at end of file
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
new file mode 100644
index 0000000..9e6d86d
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.DefaultAsyncProducer;
+
+final class KameletProducer extends DefaultAsyncProducer {
+ public KameletProducer(KameletEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public KameletEndpoint getEndpoint() {
+ return (KameletEndpoint)super.getEndpoint();
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ final KameletConsumer consumer = getEndpoint().getConsumer();
+
+ if (consumer != null) {
+ consumer.getProcessor().process(exchange);
+ } else {
+ exchange.setException(
+ new CamelExchangeException(
+ "No consumers available on endpoint: " + getEndpoint(), exchange)
+ );
+ }
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ final KameletConsumer consumer = getEndpoint().getConsumer();;
+
+ if (consumer != null) {
+ return consumer.getAsyncProcessor().process(exchange, callback);
+ } else {
+ exchange.setException(
+ new CamelExchangeException(
+ "No consumers available on endpoint: " + getEndpoint(), exchange)
+ );
+
+ callback.done(true);
+ return true;
+ }
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+ }
+}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
index e023e07..5826f21 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletBasicTest.java
@@ -78,13 +78,13 @@ public class KameletBasicTest extends CamelTestSupport {
public void configure() throws Exception {
routeTemplate("setBody")
.templateParameter("bodyValue")
- .from("direct:{{routeId}}")
+ .from("kamelet:source")
.setBody().constant("{{bodyValue}}");
routeTemplate("tick")
.from("timer:{{routeId}}?repeatCount=1&delay=-1")
.setBody().exchangeProperty(Exchange.TIMER_COUNTER)
- .to("direct:{{routeId}}");
+ .to("kamelet:sink");
from("direct:templateEmbedded")
.toF("kamelet:setBody/embedded?bodyValue=embedded");
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
index d33a15b..67d6ff5 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletPropertiesTest.java
@@ -78,7 +78,7 @@ public class KameletPropertiesTest extends CamelTestSupport {
// template
routeTemplate("setBody")
.templateParameter("bodyValue")
- .from("direct:{{routeId}}")
+ .from("kamelet:source")
.setBody().constant("{{bodyValue}}");
}
};
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
index 8aed313..7e8c345 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletRouteTest.java
@@ -18,16 +18,13 @@ package org.apache.camel.component.kamelet;
import java.util.UUID;
-import org.apache.camel.CamelExecutionException;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.direct.DirectConsumerNotAvailableException;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.http.annotation.Obsolete;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
public class KameletRouteTest extends CamelTestSupport {
@Test
@@ -48,15 +45,6 @@ public class KameletRouteTest extends CamelTestSupport {
).isEqualTo("b-a-" + body);
}
- @Test
- public void testFailure() {
- String body = UUID.randomUUID().toString();
-
- assertThatExceptionOfType(CamelExecutionException.class)
- .isThrownBy(() -> fluentTemplate.toF("direct:fail").withBody(body).request(String.class))
- .withCauseExactlyInstanceOf(DirectConsumerNotAvailableException.class);
- }
-
// **********************************************
//
// test set-up
@@ -70,12 +58,7 @@ public class KameletRouteTest extends CamelTestSupport {
public void configure() throws Exception {
routeTemplate("echo")
.templateParameter("prefix")
- .from("direct:{{routeId}}")
- .setBody().simple("{{prefix}}-${body}");
-
- routeTemplate("echo-fail")
- .templateParameter("prefix")
- .from("direct:#property:routeId")
+ .from("kamelet:source")
.setBody().simple("{{prefix}}-${body}");
from("direct:single")
@@ -86,10 +69,6 @@ public class KameletRouteTest extends CamelTestSupport {
.to("kamelet:echo/1?prefix=a")
.to("kamelet:echo/2?prefix=b")
.log("${body}");
-
- from("direct:fail")
- .to("kamelet:echo-fail?prefix=a")
- .log("${body}");
}
};
}
diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
index e16cf10..f35c8e3 100644
--- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
+++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletValidationTest.java
@@ -34,7 +34,7 @@ public class KameletValidationTest {
public void configure() throws Exception {
routeTemplate("setBody")
.templateParameter("bodyValue")
- .from("direct:{{routeId}}")
+ .from("kamelet:source")
.setBody().constant("{{bodyValue}}");
from("direct:start")
diff --git a/components/camel-kamelet/src/test/resources/log4j2-test.xml b/components/camel-kamelet/src/test/resources/log4j2-test.xml
index d5df1ad..8ce15f1 100644
--- a/components/camel-kamelet/src/test/resources/log4j2-test.xml
+++ b/components/camel-kamelet/src/test/resources/log4j2-test.xml
@@ -32,9 +32,7 @@
<Logger name="org.apache.camel.component.kamelet" level="TRACE"/>
<Root level="INFO">
- <!--
<AppenderRef ref="STDOUT"/>
- -->
<AppenderRef ref="FILE"/>
</Root>
</Loggers>