You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/15 16:00:41 UTC

[camel] branch direct created (now fda9f49)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch direct
in repository https://gitbox.apache.org/repos/asf/camel.git.


      at fda9f49  CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.

This branch includes the following new commits:

     new 05a879a  CAMEL-15690: camel-direct - Optimize to move state from endpoint to component which makes it easier to optimize wiring between producer and consumers.
     new b21a8e5  CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
     new fda9f49  CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/03: CAMEL-15690: camel-direct - Optimize to move state from endpoint to component which makes it easier to optimize wiring between producer and consumers.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch direct
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 05a879a475b1ba33482ca35b96dae10ddcd7b0c4
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 14:22:28 2020 +0200

    CAMEL-15690: camel-direct - Optimize to move state from endpoint to component which makes it easier to optimize wiring between producer and consumers.
---
 .../camel/component/direct/DirectComponent.java    | 48 ++++++++++++++++++--
 .../camel/component/direct/DirectConsumer.java     | 17 +++----
 .../camel/component/direct/DirectEndpoint.java     | 53 ++++------------------
 .../camel/component/direct/DirectProducer.java     | 12 +++--
 4 files changed, 70 insertions(+), 60 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 33bc0ee..6494ed6 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -24,6 +24,7 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 
 /**
  * The <a href="http://camel.apache.org/direct.html">Direct Component</a> manages {@link DirectEndpoint} and holds the
@@ -32,10 +33,9 @@ import org.apache.camel.support.service.ServiceHelper;
 @Component("direct")
 public class DirectComponent extends DefaultComponent {
 
-    // must keep a map of consumers on the component to ensure endpoints can lookup old consumers
-    // later in case the DirectEndpoint was re-created due the old was evicted from the endpoints LRUCache
-    // on DefaultCamelContext
+    // active consumers
     private final Map<String, DirectConsumer> consumers = new HashMap<>();
+
     @Metadata(label = "producer", defaultValue = "true")
     private boolean block = true;
     @Metadata(label = "producer", defaultValue = "30000")
@@ -46,7 +46,7 @@ public class DirectComponent extends DefaultComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        DirectEndpoint endpoint = new DirectEndpoint(uri, this, consumers);
+        DirectEndpoint endpoint = new DirectEndpoint(uri, this);
         endpoint.setBlock(block);
         endpoint.setTimeout(timeout);
         setProperties(endpoint, parameters);
@@ -82,4 +82,44 @@ public class DirectComponent extends DefaultComponent {
     public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
+
+    public void addConsumer(String key, DirectConsumer consumer) {
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException(
+                        "Cannot add a 2nd consumer to the same endpoint: " + key
+                                                   + ". DirectEndpoint only allows one consumer.");
+            }
+            consumers.notifyAll();
+        }
+    }
+
+    public void removeConsumer(String key, DirectConsumer consumer) {
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            consumers.notifyAll();
+        }
+    }
+
+    protected DirectConsumer getConsumer(String key, boolean block) throws InterruptedException {
+        synchronized (consumers) {
+            DirectConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (;;) {
+                    answer = consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
+            }
+            return answer;
+        }
+    }
+
 }
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 45deae4..46d7de9 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.direct;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
@@ -28,11 +27,13 @@ import org.apache.camel.support.DefaultConsumer;
  */
 public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
 
-    private DirectEndpoint endpoint;
+    private final DirectComponent component;
+    private final String key;
 
-    public DirectConsumer(Endpoint endpoint, Processor processor) {
+    public DirectConsumer(DirectEndpoint endpoint, Processor processor, String key) {
         super(endpoint, processor);
-        this.endpoint = (DirectEndpoint) endpoint;
+        this.component = (DirectComponent) endpoint.getComponent();
+        this.key = key;
     }
 
     @Override
@@ -43,24 +44,24 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        endpoint.addConsumer(this);
+        component.addConsumer(key, this);
     }
 
     @Override
     protected void doStop() throws Exception {
-        endpoint.removeConsumer(this);
+        component.removeConsumer(key, this);
         super.doStop();
     }
 
     @Override
     protected void doSuspend() throws Exception {
-        endpoint.removeConsumer(this);
+        component.removeConsumer(key, this);
     }
 
     @Override
     protected void doResume() throws Exception {
         // resume by using the start logic
-        endpoint.addConsumer(this);
+        component.addConsumer(key, this);
     }
 
     @Override
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 7d5ac76..9c5bd87 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.component.direct;
 
-import java.util.Map;
-
 import org.apache.camel.Category;
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -28,7 +25,6 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
 
 /**
@@ -40,7 +36,7 @@ import org.apache.camel.util.StringHelper;
              category = { Category.CORE, Category.ENDPOINT })
 public class DirectEndpoint extends DefaultEndpoint {
 
-    private final Map<String, DirectConsumer> consumers;
+    private final DirectComponent component;
     private final String key;
 
     @UriPath(description = "Name of direct endpoint")
@@ -54,9 +50,9 @@ public class DirectEndpoint extends DefaultEndpoint {
     @UriParam(label = "producer", defaultValue = "true")
     private boolean failIfNoConsumers = true;
 
-    public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
+    public DirectEndpoint(String uri, DirectComponent component) {
         super(uri, component);
-        this.consumers = consumers;
+        this.component = component;
         if (uri.indexOf('?') != -1) {
             this.key = StringHelper.before(uri, "?");
         } else {
@@ -66,52 +62,19 @@ public class DirectEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new DirectProducer(this);
+        return new DirectProducer(this, key);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = new DirectConsumer(this, processor);
+        Consumer answer = new DirectConsumer(this, processor, key);
         configureConsumer(answer);
         return answer;
     }
 
-    public void addConsumer(DirectConsumer consumer) {
-        synchronized (consumers) {
-            if (consumers.putIfAbsent(key, consumer) != null) {
-                throw new IllegalArgumentException(
-                        "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
-            }
-            consumers.notifyAll();
-        }
-    }
-
-    public void removeConsumer(DirectConsumer consumer) {
-        synchronized (consumers) {
-            consumers.remove(key, consumer);
-            consumers.notifyAll();
-        }
-    }
-
-    protected DirectConsumer getConsumer() throws InterruptedException {
-        synchronized (consumers) {
-            DirectConsumer answer = consumers.get(key);
-            if (answer == null && block) {
-                StopWatch watch = new StopWatch();
-                for (;;) {
-                    answer = consumers.get(key);
-                    if (answer != null) {
-                        break;
-                    }
-                    long rem = timeout - watch.taken();
-                    if (rem <= 0) {
-                        break;
-                    }
-                    consumers.wait(rem);
-                }
-            }
-            return answer;
-        }
+    @Deprecated
+    public DirectConsumer getConsumer() throws InterruptedException {
+        return component.getConsumer(key, block);
     }
 
     public boolean isBlock() {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 5c3e7f7..25e561c 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -30,15 +30,21 @@ public class DirectProducer extends DefaultAsyncProducer {
     private static final Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
 
     private final DirectEndpoint endpoint;
+    private final DirectComponent component;
+    private final String key;
+    private final boolean block;
 
-    public DirectProducer(DirectEndpoint endpoint) {
+    public DirectProducer(DirectEndpoint endpoint, String key) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.component = (DirectComponent) endpoint.getComponent();
+        this.key = key;
+        this.block = endpoint.isBlock();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        DirectConsumer consumer = endpoint.getConsumer();
+        DirectConsumer consumer = component.getConsumer(key, block);
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
                 throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
@@ -53,7 +59,7 @@ public class DirectProducer extends DefaultAsyncProducer {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            DirectConsumer consumer = endpoint.getConsumer();
+            DirectConsumer consumer = component.getConsumer(key, block);
             if (consumer == null) {
                 if (endpoint.isFailIfNoConsumers()) {
                     exchange.setException(new DirectConsumerNotAvailableException(


[camel] 03/03: CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch direct
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fda9f495d0cd08c273630874ef94f7e1ab312292
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 17:17:44 2020 +0200

    CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
---
 .../java/org/apache/camel/component/direct/DirectComponent.java     | 2 +-
 .../main/java/org/apache/camel/component/direct/DirectEndpoint.java | 2 +-
 .../main/java/org/apache/camel/component/direct/DirectProducer.java | 6 ++++--
 .../apache/camel/component/direct/DirectProducerBlockingTest.java   | 2 ++
 4 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 1a412a4..a083596 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -114,7 +114,7 @@ public class DirectComponent extends DefaultComponent {
         }
     }
 
-    protected DirectConsumer getConsumer(String key, boolean block) throws InterruptedException {
+    protected DirectConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
         synchronized (consumers) {
             DirectConsumer answer = consumers.get(key);
             if (answer == null && block) {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 9c5bd87..4281ec0 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -74,7 +74,7 @@ public class DirectEndpoint extends DefaultEndpoint {
 
     @Deprecated
     public DirectConsumer getConsumer() throws InterruptedException {
-        return component.getConsumer(key, block);
+        return component.getConsumer(key, block, timeout);
     }
 
     public boolean isBlock() {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index d71a23d..604563f 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -36,6 +36,7 @@ public class DirectProducer extends DefaultAsyncProducer {
     private final DirectComponent component;
     private final String key;
     private final boolean block;
+    private final long timeout;
 
     public DirectProducer(DirectEndpoint endpoint, String key) {
         super(endpoint);
@@ -43,13 +44,14 @@ public class DirectProducer extends DefaultAsyncProducer {
         this.component = (DirectComponent) endpoint.getComponent();
         this.key = key;
         this.block = endpoint.isBlock();
+        this.timeout = endpoint.getTimeout();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
         if (consumer == null || stateCounter != component.getStateCounter()) {
             stateCounter = component.getStateCounter();
-            consumer = component.getConsumer(key, block);
+            consumer = component.getConsumer(key, block, timeout);
         }
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
@@ -67,7 +69,7 @@ public class DirectProducer extends DefaultAsyncProducer {
         try {
             if (consumer == null || stateCounter != component.getStateCounter()) {
                 stateCounter = component.getStateCounter();
-                consumer = component.getConsumer(key, block);
+                consumer = component.getConsumer(key, block, timeout);
             }
             if (consumer == null) {
                 if (endpoint.isFailIfNoConsumers()) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
index f79847b..cd54a2d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -45,6 +45,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
                     = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
             assertIsInstanceOf(CamelExchangeException.class, cause);
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }
 
@@ -63,6 +64,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
             assertIsInstanceOf(CamelExchangeException.class, cause);
 
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }
 


[camel] 02/03: CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch direct
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b21a8e5c600221e1e028f1146f0a2df17a3633cc
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 15:42:22 2020 +0200

    CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
---
 .../apache/camel/component/direct/DirectComponent.java  | 17 +++++++++++++++--
 .../apache/camel/component/direct/DirectProducer.java   | 13 +++++++++++--
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 6494ed6..1a412a4 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -35,6 +35,11 @@ public class DirectComponent extends DefaultComponent {
 
     // active consumers
     private final Map<String, DirectConsumer> consumers = new HashMap<>();
+    // counter that is used for producers to keep track if any consumer was added/removed since they last checked
+    // this is used for optimization to avoid each producer to get consumer for each message processed
+    // (locking via synchronized, and then lookup in the map as the cost)
+    // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
+    private volatile int stateCounter;
 
     @Metadata(label = "producer", defaultValue = "true")
     private boolean block = true;
@@ -54,8 +59,8 @@ public class DirectComponent extends DefaultComponent {
     }
 
     @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(consumers);
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(consumers);
         consumers.clear();
         super.doStop();
     }
@@ -83,6 +88,10 @@ public class DirectComponent extends DefaultComponent {
         this.timeout = timeout;
     }
 
+    int getStateCounter() {
+        return stateCounter;
+    }
+
     public void addConsumer(String key, DirectConsumer consumer) {
         synchronized (consumers) {
             if (consumers.putIfAbsent(key, consumer) != null) {
@@ -90,6 +99,8 @@ public class DirectComponent extends DefaultComponent {
                         "Cannot add a 2nd consumer to the same endpoint: " + key
                                                    + ". DirectEndpoint only allows one consumer.");
             }
+            // state changed so inc counter
+            stateCounter++;
             consumers.notifyAll();
         }
     }
@@ -97,6 +108,8 @@ public class DirectComponent extends DefaultComponent {
     public void removeConsumer(String key, DirectConsumer consumer) {
         synchronized (consumers) {
             consumers.remove(key, consumer);
+            // state changed so inc counter
+            stateCounter++;
             consumers.notifyAll();
         }
     }
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 25e561c..d71a23d 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -29,6 +29,9 @@ public class DirectProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
 
+    private volatile DirectConsumer consumer;
+    private int stateCounter;
+
     private final DirectEndpoint endpoint;
     private final DirectComponent component;
     private final String key;
@@ -44,7 +47,10 @@ public class DirectProducer extends DefaultAsyncProducer {
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        DirectConsumer consumer = component.getConsumer(key, block);
+        if (consumer == null || stateCounter != component.getStateCounter()) {
+            stateCounter = component.getStateCounter();
+            consumer = component.getConsumer(key, block);
+        }
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
                 throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
@@ -59,7 +65,10 @@ public class DirectProducer extends DefaultAsyncProducer {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            DirectConsumer consumer = component.getConsumer(key, block);
+            if (consumer == null || stateCounter != component.getStateCounter()) {
+                stateCounter = component.getStateCounter();
+                consumer = component.getConsumer(key, block);
+            }
             if (consumer == null) {
                 if (endpoint.isFailIfNoConsumers()) {
                     exchange.setException(new DirectConsumerNotAvailableException(