You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/19 14:30:18 UTC

[camel] branch master updated: CAMEL-15690: camel-core - Optimize direct produce (#4458)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 55a17ec  CAMEL-15690: camel-core - Optimize direct produce (#4458)
55a17ec is described below

commit 55a17ecf760f2a78996b84c4db97db139798a50f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 19 16:29:50 2020 +0200

    CAMEL-15690: camel-core - Optimize direct produce (#4458)
    
    CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
---
 .../camel/component/direct/DirectComponent.java    | 65 ++++++++++++++++++++--
 .../camel/component/direct/DirectConsumer.java     | 17 +++---
 .../camel/component/direct/DirectEndpoint.java     | 53 +++---------------
 .../camel/component/direct/DirectProducer.java     | 23 +++++++-
 .../direct/DirectProducerBlockingTest.java         |  2 +
 .../camel/itest/jmh/DirectConcurrentTest.java      | 15 +++--
 6 files changed, 105 insertions(+), 70 deletions(-)

diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 33bc0ee..a083596 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -24,6 +24,7 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
 
 /**
  * The <a href="http://camel.apache.org/direct.html">Direct Component</a> manages {@link DirectEndpoint} and holds the
@@ -32,10 +33,14 @@ import org.apache.camel.support.service.ServiceHelper;
 @Component("direct")
 public class DirectComponent extends DefaultComponent {
 
-    // must keep a map of consumers on the component to ensure endpoints can lookup old consumers
-    // later in case the DirectEndpoint was re-created due the old was evicted from the endpoints LRUCache
-    // on DefaultCamelContext
+    // active consumers
     private final Map<String, DirectConsumer> consumers = new HashMap<>();
+    // counter that is used for producers to keep track if any consumer was added/removed since they last checked
+    // this is used for optimization to avoid each producer to get consumer for each message processed
+    // (locking via synchronized, and then lookup in the map as the cost)
+    // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
+    private volatile int stateCounter;
+
     @Metadata(label = "producer", defaultValue = "true")
     private boolean block = true;
     @Metadata(label = "producer", defaultValue = "30000")
@@ -46,7 +51,7 @@ public class DirectComponent extends DefaultComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        DirectEndpoint endpoint = new DirectEndpoint(uri, this, consumers);
+        DirectEndpoint endpoint = new DirectEndpoint(uri, this);
         endpoint.setBlock(block);
         endpoint.setTimeout(timeout);
         setProperties(endpoint, parameters);
@@ -54,8 +59,8 @@ public class DirectComponent extends DefaultComponent {
     }
 
     @Override
-    protected void doStop() throws Exception {
-        ServiceHelper.stopService(consumers);
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(consumers);
         consumers.clear();
         super.doStop();
     }
@@ -82,4 +87,52 @@ public class DirectComponent extends DefaultComponent {
     public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
+
+    int getStateCounter() {
+        return stateCounter;
+    }
+
+    public void addConsumer(String key, DirectConsumer consumer) {
+        synchronized (consumers) {
+            if (consumers.putIfAbsent(key, consumer) != null) {
+                throw new IllegalArgumentException(
+                        "Cannot add a 2nd consumer to the same endpoint: " + key
+                                                   + ". DirectEndpoint only allows one consumer.");
+            }
+            // state changed so inc counter
+            stateCounter++;
+            consumers.notifyAll();
+        }
+    }
+
+    public void removeConsumer(String key, DirectConsumer consumer) {
+        synchronized (consumers) {
+            consumers.remove(key, consumer);
+            // state changed so inc counter
+            stateCounter++;
+            consumers.notifyAll();
+        }
+    }
+
+    protected DirectConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
+        synchronized (consumers) {
+            DirectConsumer answer = consumers.get(key);
+            if (answer == null && block) {
+                StopWatch watch = new StopWatch();
+                for (;;) {
+                    answer = consumers.get(key);
+                    if (answer != null) {
+                        break;
+                    }
+                    long rem = timeout - watch.taken();
+                    if (rem <= 0) {
+                        break;
+                    }
+                    consumers.wait(rem);
+                }
+            }
+            return answer;
+        }
+    }
+
 }
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 45deae4..46d7de9 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.direct;
 
-import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
@@ -28,11 +27,13 @@ import org.apache.camel.support.DefaultConsumer;
  */
 public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
 
-    private DirectEndpoint endpoint;
+    private final DirectComponent component;
+    private final String key;
 
-    public DirectConsumer(Endpoint endpoint, Processor processor) {
+    public DirectConsumer(DirectEndpoint endpoint, Processor processor, String key) {
         super(endpoint, processor);
-        this.endpoint = (DirectEndpoint) endpoint;
+        this.component = (DirectComponent) endpoint.getComponent();
+        this.key = key;
     }
 
     @Override
@@ -43,24 +44,24 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        endpoint.addConsumer(this);
+        component.addConsumer(key, this);
     }
 
     @Override
     protected void doStop() throws Exception {
-        endpoint.removeConsumer(this);
+        component.removeConsumer(key, this);
         super.doStop();
     }
 
     @Override
     protected void doSuspend() throws Exception {
-        endpoint.removeConsumer(this);
+        component.removeConsumer(key, this);
     }
 
     @Override
     protected void doResume() throws Exception {
         // resume by using the start logic
-        endpoint.addConsumer(this);
+        component.addConsumer(key, this);
     }
 
     @Override
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 7d5ac76..4281ec0 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -16,10 +16,7 @@
  */
 package org.apache.camel.component.direct;
 
-import java.util.Map;
-
 import org.apache.camel.Category;
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -28,7 +25,6 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.StringHelper;
 
 /**
@@ -40,7 +36,7 @@ import org.apache.camel.util.StringHelper;
              category = { Category.CORE, Category.ENDPOINT })
 public class DirectEndpoint extends DefaultEndpoint {
 
-    private final Map<String, DirectConsumer> consumers;
+    private final DirectComponent component;
     private final String key;
 
     @UriPath(description = "Name of direct endpoint")
@@ -54,9 +50,9 @@ public class DirectEndpoint extends DefaultEndpoint {
     @UriParam(label = "producer", defaultValue = "true")
     private boolean failIfNoConsumers = true;
 
-    public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
+    public DirectEndpoint(String uri, DirectComponent component) {
         super(uri, component);
-        this.consumers = consumers;
+        this.component = component;
         if (uri.indexOf('?') != -1) {
             this.key = StringHelper.before(uri, "?");
         } else {
@@ -66,52 +62,19 @@ public class DirectEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return new DirectProducer(this);
+        return new DirectProducer(this, key);
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Consumer answer = new DirectConsumer(this, processor);
+        Consumer answer = new DirectConsumer(this, processor, key);
         configureConsumer(answer);
         return answer;
     }
 
-    public void addConsumer(DirectConsumer consumer) {
-        synchronized (consumers) {
-            if (consumers.putIfAbsent(key, consumer) != null) {
-                throw new IllegalArgumentException(
-                        "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
-            }
-            consumers.notifyAll();
-        }
-    }
-
-    public void removeConsumer(DirectConsumer consumer) {
-        synchronized (consumers) {
-            consumers.remove(key, consumer);
-            consumers.notifyAll();
-        }
-    }
-
-    protected DirectConsumer getConsumer() throws InterruptedException {
-        synchronized (consumers) {
-            DirectConsumer answer = consumers.get(key);
-            if (answer == null && block) {
-                StopWatch watch = new StopWatch();
-                for (;;) {
-                    answer = consumers.get(key);
-                    if (answer != null) {
-                        break;
-                    }
-                    long rem = timeout - watch.taken();
-                    if (rem <= 0) {
-                        break;
-                    }
-                    consumers.wait(rem);
-                }
-            }
-            return answer;
-        }
+    @Deprecated
+    public DirectConsumer getConsumer() throws InterruptedException {
+        return component.getConsumer(key, block, timeout);
     }
 
     public boolean isBlock() {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 5c3e7f7..604563f 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -29,16 +29,30 @@ public class DirectProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
 
+    private volatile DirectConsumer consumer;
+    private int stateCounter;
+
     private final DirectEndpoint endpoint;
+    private final DirectComponent component;
+    private final String key;
+    private final boolean block;
+    private final long timeout;
 
-    public DirectProducer(DirectEndpoint endpoint) {
+    public DirectProducer(DirectEndpoint endpoint, String key) {
         super(endpoint);
         this.endpoint = endpoint;
+        this.component = (DirectComponent) endpoint.getComponent();
+        this.key = key;
+        this.block = endpoint.isBlock();
+        this.timeout = endpoint.getTimeout();
     }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        DirectConsumer consumer = endpoint.getConsumer();
+        if (consumer == null || stateCounter != component.getStateCounter()) {
+            stateCounter = component.getStateCounter();
+            consumer = component.getConsumer(key, block, timeout);
+        }
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
                 throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
@@ -53,7 +67,10 @@ public class DirectProducer extends DefaultAsyncProducer {
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            DirectConsumer consumer = endpoint.getConsumer();
+            if (consumer == null || stateCounter != component.getStateCounter()) {
+                stateCounter = component.getStateCounter();
+                consumer = component.getConsumer(key, block, timeout);
+            }
             if (consumer == null) {
                 if (endpoint.isFailIfNoConsumers()) {
                     exchange.setException(new DirectConsumerNotAvailableException(
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
index f79847b..cd54a2d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -45,6 +45,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
                     = assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
             assertIsInstanceOf(CamelExchangeException.class, cause);
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }
 
@@ -63,6 +64,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
             assertIsInstanceOf(CamelExchangeException.class, cause);
 
             assertTrue(watch.taken() > 490);
+            assertTrue(watch.taken() < 5000);
         }
     }
 
diff --git a/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java b/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
index ad2e156..97c1b06 100644
--- a/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
+++ b/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
@@ -34,7 +34,6 @@ import org.openjdk.jmh.infra.Blackhole;
 import org.openjdk.jmh.runner.Runner;
 import org.openjdk.jmh.runner.options.Options;
 import org.openjdk.jmh.runner.options.OptionsBuilder;
-import org.openjdk.jmh.runner.options.TimeValue;
 
 /**
  * Tests a simple Camel route
@@ -48,12 +47,10 @@ public class DirectConcurrentTest {
                 // You can be more specific if you'd like to run only one benchmark per test.
                 .include(this.getClass().getName() + ".*")
                 // Set the following options as needed
-                .mode(Mode.All)
-                .timeUnit(TimeUnit.MICROSECONDS)
-                .warmupTime(TimeValue.seconds(5))
-                .warmupIterations(0)
-                .measurementTime(TimeValue.seconds(30))
-                .measurementIterations(1)
+                .mode(Mode.AverageTime)
+                .timeUnit(TimeUnit.MILLISECONDS)
+                .warmupIterations(1)
+                .measurementIterations(5)
                 .threads(4)
                 .forks(1)
                 .shouldFailOnError(true)
@@ -115,7 +112,9 @@ public class DirectConcurrentTest {
     @Benchmark
     public void directConcurrentTest(BenchmarkState state, Blackhole bh) {
         ProducerTemplate template = state.producer;
-        template.sendBody("direct:start", "Hello World");
+        for (int i = 0; i < 50000; i++) {
+            template.sendBody("direct:start", "Hello " + i);
+        }
     }
 
 }