You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/15 10:45:56 UTC

[camel] 01/04: CAMEL-15176: Optimize component to do as much in init phase vs start phase.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e14e44580a823445d6f67b18a957ac57874a8543
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 09:59:21 2020 +0200

    CAMEL-15176: Optimize component to do as much in init phase vs start phase.
---
 .../camel/component/direct/DirectEndpoint.java     | 14 ++---
 .../camel/component/directvm/DirectVmProducer.java | 73 +++++++++++++---------
 components/components-init-work-in-progress.md     | 12 ++--
 3 files changed, 56 insertions(+), 43 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 941ce31..4b4e1fd 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -42,6 +42,7 @@ import org.apache.camel.util.StringHelper;
 public class DirectEndpoint extends DefaultEndpoint {
 
     private final Map<String, DirectConsumer> consumers;
+    private String key;
 
     @UriPath(description = "Name of direct endpoint")
     @Metadata(required = true)
@@ -68,6 +69,11 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     @Override
+    protected void doInit() throws Exception {
+        key = initKey();
+    }
+
+    @Override
     public Producer createProducer() throws Exception {
         return new DirectProducer(this);
     }
@@ -80,7 +86,6 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public void addConsumer(DirectConsumer consumer) {
-        String key = getKey();
         synchronized (consumers) {
             if (consumers.putIfAbsent(key, consumer) != null) {
                 throw new IllegalArgumentException(
@@ -91,7 +96,6 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     public void removeConsumer(DirectConsumer consumer) {
-        String key = getKey();
         synchronized (consumers) {
             consumers.remove(key, consumer);
             consumers.notifyAll();
@@ -99,7 +103,6 @@ public class DirectEndpoint extends DefaultEndpoint {
     }
 
     protected DirectConsumer getConsumer() throws InterruptedException {
-        String key = getKey();
         synchronized (consumers) {
             DirectConsumer answer = consumers.get(key);
             if (answer == null && block) {
@@ -116,9 +119,6 @@ public class DirectEndpoint extends DefaultEndpoint {
                     consumers.wait(rem);
                 }
             }
-            //            if (answer != null && answer.getEndpoint() != this) {
-            //                throw new IllegalStateException();
-            //            }
             return answer;
         }
     }
@@ -160,7 +160,7 @@ public class DirectEndpoint extends DefaultEndpoint {
         this.failIfNoConsumers = failIfNoConsumers;
     }
 
-    protected String getKey() {
+    protected String initKey() {
         String uri = getEndpointUri();
         if (uri.indexOf('?') != -1) {
             return StringHelper.before(uri, "?");
diff --git a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
index 1eed7e1..568eef7 100644
--- a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
+++ b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
@@ -54,43 +54,56 @@ public class DirectVmProducer extends DefaultAsyncProducer {
             return true;
         }
 
-        final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
+        try {
+            final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
 
-        // Only clone the Exchange if we actually need to filter out properties or headers.
-        final Exchange submitted
-                = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange;
+            // Only clone the Exchange if we actually need to filter out properties or headers.
+            final Exchange submitted
+                    = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange;
 
-        // Clear properties in the copy if we are not propagating them.
-        if (!endpoint.isPropagateProperties()) {
-            submitted.getProperties().clear();
-        }
-
-        // Filter headers by Header Filter Strategy if there is one set.
-        if (headerFilterStrategy != null) {
-            submitted.getIn().getHeaders().entrySet()
-                    .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
-        }
-
-        return consumer.getAsyncProcessor().process(submitted, done -> {
-            Message msg = submitted.getMessage();
-
-            if (headerFilterStrategy != null) {
-                msg.getHeaders().entrySet()
-                        .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted));
+            // Clear properties in the copy if we are not propagating them.
+            if (!endpoint.isPropagateProperties()) {
+                submitted.getProperties().clear();
             }
 
-            if (exchange != submitted) {
-                // only need to copy back if they are different
-                exchange.setException(submitted.getException());
-                exchange.getOut().copyFrom(msg);
+            // Filter headers by Header Filter Strategy if there is one set.
+            if (headerFilterStrategy != null) {
+                submitted.getIn().getHeaders().entrySet()
+                        .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
             }
 
-            if (endpoint.isPropagateProperties()) {
-                exchange.getProperties().putAll(submitted.getProperties());
-            }
+            return consumer.getAsyncProcessor().process(submitted, done -> {
+                try {
+                    Message msg = submitted.getMessage();
+
+                    if (headerFilterStrategy != null) {
+                        msg.getHeaders().entrySet()
+                                .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(),
+                                        submitted));
+                    }
+
+                    if (exchange != submitted) {
+                        // only need to copy back if they are different
+                        exchange.setException(submitted.getException());
+                        exchange.getOut().copyFrom(msg);
+                    }
+
+                    if (endpoint.isPropagateProperties()) {
+                        exchange.getProperties().putAll(submitted.getProperties());
+                    }
+                } catch (Throwable e) {
+                    exchange.setException(e);
+                } finally {
+                    callback.done(done);
+                }
+            });
+
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
 
-            callback.done(done);
-        });
+        callback.done(true);
+        return true;
     }
 
 }
diff --git a/components/components-init-work-in-progress.md b/components/components-init-work-in-progress.md
index 487cdab..740dac6 100644
--- a/components/components-init-work-in-progress.md
+++ b/components/components-init-work-in-progress.md
@@ -90,13 +90,13 @@
 |camel-controlbus|DONE | |
 |camel-corda|REJECT|open network connection|
 |camel-couchbase|DONE | |
-|camel-couchdb| | |
+|camel-couchdb|DONE | |
 |camel-cron|DONE | |
-|camel-crypto| | |
-|camel-crypto-cms| | |
+|camel-crypto|DONE | |
+|camel-crypto-cms|REJECT | |
 |camel-csv|DONE| |
 |camel-cxf|REJECT|start a server|
-|camel-cxf-transport| | |
+|camel-cxf-transport|REJECT | |
 |camel-dataformat|DONE| |
 |camel-dataset|DONE| |
 |camel-debezium| | |
@@ -106,8 +106,8 @@
 |camel-debezium-postgres| | |
 |camel-debezium-sqlserver| | |
 |camel-digitalocean|REJECT|create a http client|
-|camel-direct| | |
-|camel-directvm| | |
+|camel-direct|DONE | |
+|camel-directvm|DONE | |
 |camel-disruptor|REJECT|start a thread|
 |camel-djl| | |
 |camel-dns| | |