You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/15 16:00:44 UTC

[camel] 03/03: CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch direct
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fda9f495d0cd08c273630874ef94f7e1ab312292
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 17:17:44 2020 +0200

    CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
---
 .../java/org/apache/camel/component/direct/DirectComponent.java     | 2 +-
 .../main/java/org/apache/camel/component/direct/DirectEndpoint.java | 2 +-
 .../main/java/org/apache/camel/component/direct/DirectProducer.java | 6 ++++--
 .../apache/camel/component/direct/DirectProducerBlockingTest.java   | 2 ++
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 1a412a4..a083596 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -114,7 +114,7 @@ public class DirectComponent extends DefaultComponent {
         }
     }
 
-    protected DirectConsumer getConsumer(String key, boolean block) throws InterruptedException {
+    protected DirectConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
         synchronized (consumers) {
             DirectConsumer answer = consumers.get(key);
             if (answer == null && block) {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 9c5bd87..4281ec0 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -74,7 +74,7 @@ public class DirectEndpoint extends DefaultEndpoint {
 
     @Deprecated
     public DirectConsumer getConsumer() throws InterruptedException {
-        return component.getConsumer(key, block);
+        return component.getConsumer(key, block, timeout);
     }
 
     public boolean isBlock() {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index d71a23d..604563f 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -36,6 +36,7 @@ public class DirectProducer extends DefaultAsyncProducer {
     private final DirectComponent component;
     private final String key;
     private final boolean block;
+    private final long timeout;
 
     public DirectProducer(DirectEndpoint endpoint, String key) {
         super(endpoint);
@@ -43,13 +44,14 @@ public class DirectProducer extends DefaultAsyncProducer {
         this.component = (DirectComponent) endpoint.getComponent();
         this.key = key;
         this.block = endpoint.isBlock();
+        this.timeout = endpoint.getTimeout();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
         if (consumer == null || stateCounter != component.getStateCounter()) {
             stateCounter = component.getStateCounter();
-            consumer = component.getConsumer(key, block);
+            consumer = component.getConsumer(key, block, timeout);
         }
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
@@ -67,7 +69,7 @@ public class DirectProducer extends DefaultAsyncProducer {
         try {
             if (consumer == null || stateCounter != component.getStateCounter()) {
                 stateCounter = component.getStateCounter();
-                consumer = component.getConsumer(key, block);
+                consumer = component.getConsumer(key, block, timeout);
             }
             if (consumer == null) {
                 if (endpoint.isFailIfNoConsumers()) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
index f79847b..cd54a2d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -45,6 +45,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
                     = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
             assertIsInstanceOf(CamelExchangeException.class, cause);
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }
 
@@ -63,6 +64,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
             assertIsInstanceOf(CamelExchangeException.class, cause);
 
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }