You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2020/11/12 22:15:45 UTC
[camel-k-runtime] 01/02: Fixes #544: Kamelet component - optimize
as we did for direct component
This is an automated email from the ASF dual-hosted git repository.
lburgazzoli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git
commit 758cda80af3cd1ccf719bb03540c669ee88b0090
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Nov 12 19:06:41 2020 +0100
Fixes #544: Kamelet component - optimize as we did for direct component
---
.../kamelet/KameletEndpointConfigurer.java | 5 ++
.../apache/camel/component/kamelet/kamelet.json | 3 +-
.../camel/component/kamelet/KameletComponent.java | 73 +++++++++++++++---
.../camel/component/kamelet/KameletConsumer.java | 19 +++--
.../KameletConsumerNotAvailableException.java | 27 +++++++
.../camel/component/kamelet/KameletEndpoint.java | 89 +++++++---------------
.../camel/component/kamelet/KameletProducer.java | 70 +++++++++++------
7 files changed, 184 insertions(+), 102 deletions(-)
diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
index dc0964d..cdbc56e 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
@@ -24,6 +24,7 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
map.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class);
map.put("exchangePattern", org.apache.camel.ExchangePattern.class);
map.put("block", boolean.class);
+ map.put("failIfNoConsumers", boolean.class);
map.put("kameletProperties", java.util.Map.class);
map.put("lazyStartProducer", boolean.class);
map.put("timeout", long.class);
@@ -45,6 +46,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
case "exchangepattern":
case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+ case "failifnoconsumers":
+ case "failIfNoConsumers": target.setFailIfNoConsumers(property(camelContext, boolean.class, value)); return true;
case "kameletproperties":
case "kameletProperties": target.setKameletProperties(property(camelContext, java.util.Map.class, value)); return true;
case "lazystartproducer":
@@ -73,6 +76,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
case "exceptionHandler": return target.getExceptionHandler();
case "exchangepattern":
case "exchangePattern": return target.getExchangePattern();
+ case "failifnoconsumers":
+ case "failIfNoConsumers": return target.isFailIfNoConsumers();
case "kameletproperties":
case "kameletProperties": return target.getKameletProperties();
case "lazystartproducer":
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index eff6911..91854be 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -35,7 +35,8 @@
"exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
"exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
"block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
- "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Custom properties for kamelet" },
+ "failIfNoConsumers": { "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active consumers." },
+ "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "description": "Custom properties for kamelet" },
"lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the [...]
"timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 30000, "description": "The timeout value to use if block is enabled." },
"basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7f243af..271b261 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kamelet;
import java.net.URI;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -38,6 +39,7 @@ import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.slf4j.Logger;
@@ -54,8 +56,15 @@ import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate;
public class KameletComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
- private final Map<String, KameletConsumer> consumers;
- private final LifecycleHandler lifecycleHandler;
+ // active consumers
+ private final Map<String, KameletConsumer> consumers = new HashMap<>();
+ // counter that is used for producers to keep track if any consumer was added/removed since they last checked
+ // this is used for optimization to avoid each producer to get consumer for each message processed
+ // (locking via synchronized, and then lookup in the map as the cost)
+ // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
+ private volatile int stateCounter;
+
+ private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
@Metadata(label = "producer", defaultValue = "true")
private boolean block = true;
@@ -63,8 +72,6 @@ public class KameletComponent extends DefaultComponent {
private long timeout = 30000L;
public KameletComponent() {
- this.lifecycleHandler = new LifecycleHandler();
- this.consumers = new ConcurrentHashMap<>();
}
@Override
@@ -194,7 +201,7 @@ public class KameletComponent extends DefaultComponent {
// Note that at the moment, there's no enforcement around `source`
// and `sink' to be defined on the right side (producer or consumer)
//
- endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers);
+ endpoint = new KameletEndpoint(uri, this, templateId, routeId);
// forward component properties
endpoint.setBlock(block);
@@ -203,7 +210,7 @@ public class KameletComponent extends DefaultComponent {
// set endpoint specific properties
setProperties(endpoint, parameters);
} else {
- endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) {
+ endpoint = new KameletEndpoint(uri, this, templateId, routeId) {
@Override
protected void doInit() throws Exception {
super.doInit();
@@ -266,6 +273,53 @@ public class KameletComponent extends DefaultComponent {
this.timeout = timeout;
}
+ int getStateCounter() {
+ return stateCounter;
+ }
+
+ public void addConsumer(String key, KameletConsumer consumer) {
+ synchronized (consumers) {
+ if (consumers.putIfAbsent(key, consumer) != null) {
+ throw new IllegalArgumentException(
+ "Cannot add a 2nd consumer to the same endpoint: " + key
+ + ". KameletEndpoint only allows one consumer.");
+ }
+ // state changed so inc counter
+ stateCounter++;
+ consumers.notifyAll();
+ }
+ }
+
+ public void removeConsumer(String key, KameletConsumer consumer) {
+ synchronized (consumers) {
+ consumers.remove(key, consumer);
+ // state changed so inc counter
+ stateCounter++;
+ consumers.notifyAll();
+ }
+ }
+
+ protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
+ synchronized (consumers) {
+ KameletConsumer answer = consumers.get(key);
+ if (answer == null && block) {
+ StopWatch watch = new StopWatch();
+ for (;;) {
+ answer = consumers.get(key);
+ if (answer != null) {
+ break;
+ }
+ long rem = timeout - watch.taken();
+ if (rem <= 0) {
+ break;
+ }
+ consumers.wait(rem);
+ }
+ }
+ return answer;
+ }
+ }
+
@Override
protected void doInit() throws Exception {
getCamelContext().addLifecycleStrategy(lifecycleHandler);
@@ -278,13 +332,12 @@ public class KameletComponent extends DefaultComponent {
}
@Override
- protected void doStop() throws Exception {
+ protected void doShutdown() throws Exception {
getCamelContext().getLifecycleStrategies().remove(lifecycleHandler);
- ServiceHelper.stopService(consumers.values());
+ ServiceHelper.stopAndShutdownService(consumers);
consumers.clear();
-
- super.doStop();
+ super.doShutdown();
}
/*
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
index c99d56c..36123de 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -23,8 +23,14 @@ import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.DefaultConsumer;
final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
- public KameletConsumer(KameletEndpoint endpoint, Processor processor) {
+
+ private final KameletComponent component;
+ private final String key;
+
+ public KameletConsumer(KameletEndpoint endpoint, Processor processor, String key) {
super(endpoint, processor);
+ this.component = endpoint.getComponent();
+ this.key = key;
}
@Override
@@ -34,22 +40,25 @@ final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Su
@Override
protected void doStart() throws Exception {
- getEndpoint().addConsumer(this);
+ super.doStart();
+ component.addConsumer(key, this);
}
@Override
protected void doStop() throws Exception {
- getEndpoint().removeConsumer(this);
+ component.removeConsumer(key, this);
+ super.doStop();
}
@Override
protected void doSuspend() throws Exception {
- getEndpoint().removeConsumer(this);
+ component.removeConsumer(key, this);
}
@Override
protected void doResume() throws Exception {
- getEndpoint().addConsumer(this);
+ // resume by using the start logic
+ component.addConsumer(key, this);
}
@Override
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java
new file mode 100644
index 0000000..44f1f4e
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+public class KameletConsumerNotAvailableException extends CamelExchangeException {
+
+ public KameletConsumerNotAvailableException(String message, Exchange exchange) {
+ super(message, exchange);
+ }
+}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index c3760f3..415fba7 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -30,20 +30,22 @@ import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StopWatch;
@UriEndpoint(
- firstVersion = "3.5.0",
- scheme = "kamelet",
- syntax = "kamelet:templateId/routeId",
- title = "Kamelet",
- lenientProperties = true,
- category = Category.CORE)
+ firstVersion = "3.5.0",
+ scheme = "kamelet",
+ syntax = "kamelet:templateId/routeId",
+ title = "Kamelet",
+ lenientProperties = true,
+ category = Category.CORE)
public class KameletEndpoint extends DefaultEndpoint {
+
+ private final String key;
+
@Metadata(required = true)
@UriPath(description = "The Route Template ID")
private final String templateId;
- @Metadata(required = false)
+ @Metadata
@UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided")
private final String routeId;
@@ -51,18 +53,16 @@ public class KameletEndpoint extends DefaultEndpoint {
private boolean block = true;
@UriParam(label = "producer", defaultValue = "30000")
private long timeout = 30000L;
- @UriParam(label = "producer", defaultValue = "true")
-
+ @UriParam(label = "producer")
private final Map<String, Object> kameletProperties;
- private final Map<String, KameletConsumer> consumers;
- private final String key;
+ @UriParam(label = "producer", defaultValue = "true")
+ private boolean failIfNoConsumers = true;
public KameletEndpoint(
String uri,
KameletComponent component,
String templateId,
- String routeId,
- Map<String, KameletConsumer> consumers) {
+ String routeId) {
super(uri, component);
@@ -73,7 +73,6 @@ public class KameletEndpoint extends DefaultEndpoint {
this.routeId = routeId;
this.key = templateId + "/" + routeId;
this.kameletProperties = new HashMap<>();
- this.consumers = consumers;
}
public boolean isBlock() {
@@ -101,6 +100,18 @@ public class KameletEndpoint extends DefaultEndpoint {
this.timeout = timeout;
}
+ public boolean isFailIfNoConsumers() {
+ return failIfNoConsumers;
+ }
+
+ /**
+ * Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active
+ * consumers.
+ */
+ public void setFailIfNoConsumers(boolean failIfNoConsumers) {
+ this.failIfNoConsumers = failIfNoConsumers;
+ }
+
@Override
public KameletComponent getComponent() {
return (KameletComponent) super.getComponent();
@@ -140,58 +151,14 @@ public class KameletEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new KameletProducer(this);
+ return new KameletProducer(this, key);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Consumer answer = new KameletConsumer(this, processor);
+ Consumer answer = new KameletConsumer(this, processor, key);
configureConsumer(answer);
return answer;
}
- // *********************************
- //
- // Helpers
- //
- // *********************************
-
- void addConsumer(KameletConsumer consumer) {
- synchronized (consumers) {
- if (consumers.putIfAbsent(key, consumer) != null) {
- throw new IllegalArgumentException(
- "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
- }
- consumers.notifyAll();
- }
- }
-
- void removeConsumer(KameletConsumer consumer) {
- synchronized (consumers) {
- consumers.remove(key, consumer);
- consumers.notifyAll();
- }
- }
-
- KameletConsumer getConsumer() throws InterruptedException {
- synchronized (consumers) {
- KameletConsumer answer = consumers.get(key);
- if (answer == null && block) {
- StopWatch watch = new StopWatch();
- for (; ; ) {
- answer =consumers.get(key);
- if (answer != null) {
- break;
- }
- long rem = timeout - watch.taken();
- if (rem <= 0) {
- break;
- }
- consumers.wait(rem);
- }
- }
-
- return answer;
- }
- }
}
\ No newline at end of file
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 10bd42c..726c22d 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -17,49 +17,68 @@
package org.apache.camel.component.kamelet;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.support.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
final class KameletProducer extends DefaultAsyncProducer {
- public KameletProducer(KameletEndpoint endpoint) {
- super(endpoint);
- }
- @Override
- public KameletEndpoint getEndpoint() {
- return (KameletEndpoint)super.getEndpoint();
+ private static final Logger LOG = LoggerFactory.getLogger(KameletProducer.class);
+
+ private volatile KameletConsumer consumer;
+ private int stateCounter;
+
+ private final KameletEndpoint endpoint;
+ private final KameletComponent component;
+ private final String key;
+ private final boolean block;
+ private final long timeout;
+
+ public KameletProducer(KameletEndpoint endpoint, String key) {
+ super(endpoint);
+ this.endpoint = endpoint;
+ this.component = endpoint.getComponent();
+ this.key = key;
+ this.block = endpoint.isBlock();
+ this.timeout = endpoint.getTimeout();
}
@Override
public void process(Exchange exchange) throws Exception {
- final KameletConsumer consumer = getEndpoint().getConsumer();
-
- if (consumer != null) {
- consumer.getProcessor().process(exchange);
+ if (consumer == null || stateCounter != component.getStateCounter()) {
+ stateCounter = component.getStateCounter();
+ consumer = component.getConsumer(key, block, timeout);
+ }
+ if (consumer == null) {
+ if (endpoint.isFailIfNoConsumers()) {
+ throw new KameletConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+ } else {
+ LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+ }
} else {
- exchange.setException(
- new CamelExchangeException(
- "No consumers available on endpoint: " + getEndpoint(), exchange)
- );
+ consumer.getProcessor().process(exchange);
}
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- final KameletConsumer consumer = getEndpoint().getConsumer();
-
- if (consumer != null) {
- return consumer.getAsyncProcessor().process(exchange, callback);
- } else {
- exchange.setException(
- new CamelExchangeException(
- "No consumers available on endpoint: " + getEndpoint(), exchange)
- );
-
+ if (consumer == null || stateCounter != component.getStateCounter()) {
+ stateCounter = component.getStateCounter();
+ consumer = component.getConsumer(key, block, timeout);
+ }
+ if (consumer == null) {
+ if (endpoint.isFailIfNoConsumers()) {
+ exchange.setException(new KameletConsumerNotAvailableException(
+ "No consumers available on endpoint: " + endpoint, exchange));
+ } else {
+ LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+ }
callback.done(true);
return true;
+ } else {
+ return consumer.getAsyncProcessor().process(exchange, callback);
}
} catch (Exception e) {
exchange.setException(e);
@@ -67,4 +86,5 @@ final class KameletProducer extends DefaultAsyncProducer {
return true;
}
}
+
}