You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/11/12 18:07:04 UTC

[camel-k-runtime] 01/01: Fixes #544: Kamelet component - optimize as we did for direct component

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch direct-optimization
in repository https://gitbox.apache.org/repos/asf/camel-k-runtime.git

commit 294c51a1015a09ad568189b1bdff30620e100b1e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Nov 12 19:06:41 2020 +0100

    Fixes #544: Kamelet component - optimize as we did for direct component
---
 .../kamelet/KameletEndpointConfigurer.java         |  5 ++
 .../apache/camel/component/kamelet/kamelet.json    |  3 +-
 .../camel/component/kamelet/KameletComponent.java  | 73 +++++++++++++++---
 .../camel/component/kamelet/KameletConsumer.java   | 19 +++--
 .../KameletConsumerNotAvailableException.java      | 27 +++++++
 .../camel/component/kamelet/KameletEndpoint.java   | 89 +++++++---------------
 .../camel/component/kamelet/KameletProducer.java   | 70 +++++++++++------
 7 files changed, 184 insertions(+), 102 deletions(-)

diff --git a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
index dc0964d..cdbc56e 100644
--- a/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
+++ b/components/camel-kamelet/src/generated/java/org/apache/camel/component/kamelet/KameletEndpointConfigurer.java
@@ -24,6 +24,7 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         map.put("exceptionHandler", org.apache.camel.spi.ExceptionHandler.class);
         map.put("exchangePattern", org.apache.camel.ExchangePattern.class);
         map.put("block", boolean.class);
+        map.put("failIfNoConsumers", boolean.class);
         map.put("kameletProperties", java.util.Map.class);
         map.put("lazyStartProducer", boolean.class);
         map.put("timeout", long.class);
@@ -45,6 +46,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
         case "exchangepattern":
         case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
+        case "failifnoconsumers":
+        case "failIfNoConsumers": target.setFailIfNoConsumers(property(camelContext, boolean.class, value)); return true;
         case "kameletproperties":
         case "kameletProperties": target.setKameletProperties(property(camelContext, java.util.Map.class, value)); return true;
         case "lazystartproducer":
@@ -73,6 +76,8 @@ public class KameletEndpointConfigurer extends PropertyConfigurerSupport impleme
         case "exceptionHandler": return target.getExceptionHandler();
         case "exchangepattern":
         case "exchangePattern": return target.getExchangePattern();
+        case "failifnoconsumers":
+        case "failIfNoConsumers": return target.isFailIfNoConsumers();
         case "kameletproperties":
         case "kameletProperties": return target.getKameletProperties();
         case "lazystartproducer":
diff --git a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
index eff6911..91854be 100644
--- a/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
+++ b/components/camel-kamelet/src/generated/resources/org/apache/camel/component/kamelet/kamelet.json
@@ -35,7 +35,8 @@
     "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with [...]
     "exchangePattern": { "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut", "InOptionalOut" ], "deprecated": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
     "block": { "kind": "parameter", "displayName": "Block", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "If sending a message to a direct endpoint which has no active consumer, then we can tell the producer to block and wait for the consumer to become active." },
-    "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "defaultValue": "true", "description": "Custom properties for kamelet" },
+    "failIfNoConsumers": { "kind": "parameter", "displayName": "Fail If No Consumers", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": true, "description": "Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active consumers." },
+    "kameletProperties": { "kind": "parameter", "displayName": "Kamelet Properties", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "deprecated": false, "secret": false, "description": "Custom properties for kamelet" },
     "lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the  [...]
     "timeout": { "kind": "parameter", "displayName": "Timeout", "group": "producer", "label": "producer", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "secret": false, "defaultValue": 30000, "description": "The timeout value to use if block is enabled." },
     "basicPropertyBinding": { "kind": "parameter", "displayName": "Basic Property Binding", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "secret": false, "defaultValue": false, "description": "Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities" },
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 7f243af..271b261 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.kamelet;
 
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -38,6 +39,7 @@ import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.support.LifecycleStrategySupport;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.URISupport;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
@@ -54,8 +56,15 @@ import static org.apache.camel.component.kamelet.Kamelet.addRouteFromTemplate;
 public class KameletComponent extends DefaultComponent {
     private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
 
-    private final Map<String, KameletConsumer> consumers;
-    private final LifecycleHandler lifecycleHandler;
+    // active consumers
+    private final Map<String, KameletConsumer> consumers = new HashMap<>();
+    // counter that is used for producers to keep track if any consumer was added/removed since they last checked
+    // this is used for optimization to avoid each producer to get consumer for each message processed
+    // (locking via synchronized, and then lookup in the map as the cost)
+    // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
+    private volatile int stateCounter;
+
+    private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
 
     @Metadata(label = "producer", defaultValue = "true")
     private boolean block = true;
@@ -63,8 +72,6 @@ public class KameletComponent extends DefaultComponent {
     private long timeout = 30000L;
 
     public KameletComponent() {
-        this.lifecycleHandler = new LifecycleHandler();
-        this.consumers = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -194,7 +201,7 @@ public class KameletComponent extends DefaultComponent {
             // Note that at the moment, there's no enforcement around `source`
             // and `sink' to be defined on the right side (producer or consumer)
             //
-            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers);
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId);
 
             // forward component properties
             endpoint.setBlock(block);
@@ -203,7 +210,7 @@ public class KameletComponent extends DefaultComponent {
             // set endpoint specific properties
             setProperties(endpoint, parameters);
         } else {
-            endpoint = new KameletEndpoint(uri, this, templateId, routeId, consumers) {
+            endpoint = new KameletEndpoint(uri, this, templateId, routeId) {
                 @Override
                 protected void doInit() throws Exception {
                     super.doInit();
@@ -266,6 +273,53 @@ public class KameletComponent extends DefaultComponent {
         this.timeout = timeout;
     }
 
+    int getStateCounter() {
+        return stateCounter;
+    }
+
+    public void addConsumer(String key, KameletConsumer consumer) {
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException(
+                        "Cannot add a 2nd consumer to the same endpoint: " + key
+                                + ". KameletEndpoint only allows one consumer.");
+            }
+            // state changed so inc counter
+            stateCounter++;
+            consumers.notifyAll();
+        }
+    }
+
+    public void removeConsumer(String key, KameletConsumer consumer) {
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            // state changed so inc counter
+            stateCounter++;
+            consumers.notifyAll();
+        }
+    }
+
+    protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
+        synchronized (consumers) {
+            KameletConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (;;) {
+                    answer = consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
+            }
+            return answer;
+        }
+    }
+
     @Override
     protected void doInit() throws Exception {
         getCamelContext().addLifecycleStrategy(lifecycleHandler);
@@ -278,13 +332,12 @@ public class KameletComponent extends DefaultComponent {
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected void doShutdown() throws Exception {
         getCamelContext().getLifecycleStrategies().remove(lifecycleHandler);
 
-        ServiceHelper.stopService(consumers.values());
+        ServiceHelper.stopAndShutdownService(consumers);
         consumers.clear();
-
-        super.doStop();
+        super.doShutdown();
     }
 
     /*
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
index c99d56c..36123de 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumer.java
@@ -23,8 +23,14 @@ import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.DefaultConsumer;
 
 final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
-    public KameletConsumer(KameletEndpoint endpoint, Processor processor) {
+
+    private final KameletComponent component;
+    private final String key;
+
+    public KameletConsumer(KameletEndpoint endpoint, Processor processor, String key) {
         super(endpoint, processor);
+        this.component = endpoint.getComponent();
+        this.key = key;
     }
 
     @Override
@@ -34,22 +40,25 @@ final class KameletConsumer extends DefaultConsumer implements ShutdownAware, Su
 
     @Override
     protected void doStart() throws Exception {
-        getEndpoint().addConsumer(this);
+        super.doStart();
+        component.addConsumer(key, this);
     }
 
     @Override
     protected void doStop() throws Exception {
-        getEndpoint().removeConsumer(this);
+        component.removeConsumer(key, this);
+        super.doStop();
     }
 
     @Override
     protected void doSuspend() throws Exception {
-        getEndpoint().removeConsumer(this);
+        component.removeConsumer(key, this);
     }
 
     @Override
     protected void doResume() throws Exception {
-        getEndpoint().addConsumer(this);
+        // resume by using the start logic
+        component.addConsumer(key, this);
     }
 
     @Override
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java
new file mode 100644
index 0000000..44f1f4e
--- /dev/null
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletConsumerNotAvailableException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Exchange;
+
+public class KameletConsumerNotAvailableException extends CamelExchangeException {
+
+    public KameletConsumerNotAvailableException(String message, Exchange exchange) {
+        super(message, exchange);
+    }
+}
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
index c3760f3..415fba7 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletEndpoint.java
@@ -30,20 +30,22 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StopWatch;
 
 @UriEndpoint(
-    firstVersion = "3.5.0",
-    scheme = "kamelet",
-    syntax = "kamelet:templateId/routeId",
-    title = "Kamelet",
-    lenientProperties = true,
-    category = Category.CORE)
+        firstVersion = "3.5.0",
+        scheme = "kamelet",
+        syntax = "kamelet:templateId/routeId",
+        title = "Kamelet",
+        lenientProperties = true,
+        category = Category.CORE)
 public class KameletEndpoint extends DefaultEndpoint {
+
+    private final String key;
+
     @Metadata(required = true)
     @UriPath(description = "The Route Template ID")
     private final String templateId;
-    @Metadata(required = false)
+    @Metadata
     @UriPath(description = "The Route ID", defaultValueNote = "The ID will be auto-generated if not provided")
     private final String routeId;
 
@@ -51,18 +53,16 @@ public class KameletEndpoint extends DefaultEndpoint {
     private boolean block = true;
     @UriParam(label = "producer", defaultValue = "30000")
     private long timeout = 30000L;
-    @UriParam(label = "producer", defaultValue = "true")
-
+    @UriParam(label = "producer")
     private final Map<String, Object> kameletProperties;
-    private final Map<String, KameletConsumer> consumers;
-    private final String key;
+    @UriParam(label = "producer", defaultValue = "true")
+    private boolean failIfNoConsumers = true;
 
     public KameletEndpoint(
             String uri,
             KameletComponent component,
             String templateId,
-            String routeId,
-            Map<String, KameletConsumer> consumers) {
+            String routeId) {
 
         super(uri, component);
 
@@ -73,7 +73,6 @@ public class KameletEndpoint extends DefaultEndpoint {
         this.routeId = routeId;
         this.key = templateId + "/" + routeId;
         this.kameletProperties = new HashMap<>();
-        this.consumers = consumers;
     }
 
     public boolean isBlock() {
@@ -101,6 +100,18 @@ public class KameletEndpoint extends DefaultEndpoint {
         this.timeout = timeout;
     }
 
+    public boolean isFailIfNoConsumers() {
+        return failIfNoConsumers;
+    }
+
+    /**
+     * Whether the producer should fail by throwing an exception, when sending to a kamelet endpoint with no active
+     * consumers.
+     */
+    public void setFailIfNoConsumers(boolean failIfNoConsumers) {
+        this.failIfNoConsumers = failIfNoConsumers;
+    }
+
     @Override
     public KameletComponent getComponent() {
         return (KameletComponent) super.getComponent();
@@ -140,58 +151,14 @@ public class KameletEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new KameletProducer(this);
+        return new KameletProducer(this, key);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = new KameletConsumer(this, processor);
+        Consumer answer = new KameletConsumer(this, processor, key);
         configureConsumer(answer);
         return answer;
     }
 
-    // *********************************
-    //
-    // Helpers
-    //
-    // *********************************
-
-    void addConsumer(KameletConsumer consumer) {
-        synchronized (consumers) {
-            if (consumers.putIfAbsent(key, consumer) != null) {
-                throw new IllegalArgumentException(
-                    "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
-            }
-            consumers.notifyAll();
-        }
-    }
-
-    void removeConsumer(KameletConsumer consumer) {
-        synchronized (consumers) {
-            consumers.remove(key, consumer);
-            consumers.notifyAll();
-        }
-    }
-
-    KameletConsumer getConsumer() throws InterruptedException {
-        synchronized (consumers) {
-            KameletConsumer answer = consumers.get(key);
-            if (answer == null && block) {
-                StopWatch watch = new StopWatch();
-                for (; ; ) {
-                    answer =consumers.get(key);
-                    if (answer != null) {
-                        break;
-                    }
-                    long rem = timeout - watch.taken();
-                    if (rem <= 0) {
-                        break;
-                    }
-                    consumers.wait(rem);
-                }
-            }
-
-            return answer;
-        }
-    }
 }
\ No newline at end of file
diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
index 10bd42c..726c22d 100644
--- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
+++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProducer.java
@@ -17,49 +17,68 @@
 package org.apache.camel.component.kamelet;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.support.DefaultAsyncProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 final class KameletProducer extends DefaultAsyncProducer {
-    public KameletProducer(KameletEndpoint endpoint) {
-        super(endpoint);
-    }
 
-    @Override
-    public KameletEndpoint getEndpoint() {
-        return (KameletEndpoint)super.getEndpoint();
+    private static final Logger LOG = LoggerFactory.getLogger(KameletProducer.class);
+
+    private volatile KameletConsumer consumer;
+    private int stateCounter;
+
+    private final KameletEndpoint endpoint;
+    private final KameletComponent component;
+    private final String key;
+    private final boolean block;
+    private final long timeout;
+
+    public KameletProducer(KameletEndpoint endpoint, String key) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.component = endpoint.getComponent();
+        this.key = key;
+        this.block = endpoint.isBlock();
+        this.timeout = endpoint.getTimeout();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        final KameletConsumer consumer = getEndpoint().getConsumer();
-
-        if (consumer != null) {
-            consumer.getProcessor().process(exchange);
+        if (consumer == null || stateCounter != component.getStateCounter()) {
+            stateCounter = component.getStateCounter();
+            consumer = component.getConsumer(key, block, timeout);
+        }
+        if (consumer == null) {
+            if (endpoint.isFailIfNoConsumers()) {
+                throw new KameletConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
+            } else {
+                LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+            }
         } else {
-            exchange.setException(
-                new CamelExchangeException(
-                    "No consumers available on endpoint: " + getEndpoint(), exchange)
-            );
+            consumer.getProcessor().process(exchange);
         }
     }
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            final KameletConsumer consumer = getEndpoint().getConsumer();
-
-            if (consumer != null) {
-                return consumer.getAsyncProcessor().process(exchange, callback);
-            } else {
-                exchange.setException(
-                    new CamelExchangeException(
-                        "No consumers available on endpoint: " + getEndpoint(), exchange)
-                );
-
+            if (consumer == null || stateCounter != component.getStateCounter()) {
+                stateCounter = component.getStateCounter();
+                consumer = component.getConsumer(key, block, timeout);
+            }
+            if (consumer == null) {
+                if (endpoint.isFailIfNoConsumers()) {
+                    exchange.setException(new KameletConsumerNotAvailableException(
+                            "No consumers available on endpoint: " + endpoint, exchange));
+                } else {
+                    LOG.debug("message ignored, no consumers available on endpoint: {}", endpoint);
+                }
                 callback.done(true);
                 return true;
+            } else {
+                return consumer.getAsyncProcessor().process(exchange, callback);
             }
         } catch (Exception e) {
             exchange.setException(e);
@@ -67,4 +86,5 @@ final class KameletProducer extends DefaultAsyncProducer {
             return true;
         }
     }
+
 }