You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/19 14:30:18 UTC
[camel] branch master updated: CAMEL-15690: camel-core - Optimize
direct produce (#4458)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 55a17ec CAMEL-15690: camel-core - Optimize direct produce (#4458)
55a17ec is described below
commit 55a17ecf760f2a78996b84c4db97db139798a50f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Oct 19 16:29:50 2020 +0200
CAMEL-15690: camel-core - Optimize direct produce (#4458)
CAMEL-15690: camel-direct - Optimize direct producer to wire to consumer if possible during start or only once during processing. This avoids excessive synchronized locking for each processing to get the consumer - there can only be 1 consumer anyway.
---
.../camel/component/direct/DirectComponent.java | 65 ++++++++++++++++++++--
.../camel/component/direct/DirectConsumer.java | 17 +++---
.../camel/component/direct/DirectEndpoint.java | 53 +++---------------
.../camel/component/direct/DirectProducer.java | 23 +++++++-
.../direct/DirectProducerBlockingTest.java | 2 +
.../camel/itest/jmh/DirectConcurrentTest.java | 15 +++--
6 files changed, 105 insertions(+), 70 deletions(-)
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
index 33bc0ee..a083596 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectComponent.java
@@ -24,6 +24,7 @@ import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
/**
* The <a href="http://camel.apache.org/direct.html">Direct Component</a> manages {@link DirectEndpoint} and holds the
@@ -32,10 +33,14 @@ import org.apache.camel.support.service.ServiceHelper;
@Component("direct")
public class DirectComponent extends DefaultComponent {
- // must keep a map of consumers on the component to ensure endpoints can lookup old consumers
- // later in case the DirectEndpoint was re-created due the old was evicted from the endpoints LRUCache
- // on DefaultCamelContext
+ // active consumers
private final Map<String, DirectConsumer> consumers = new HashMap<>();
+ // counter that is used for producers to keep track if any consumer was added/removed since they last checked
+ // this is used for optimization to avoid each producer to get consumer for each message processed
+ // (locking via synchronized, and then lookup in the map as the cost)
+ // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
+ private volatile int stateCounter;
+
@Metadata(label = "producer", defaultValue = "true")
private boolean block = true;
@Metadata(label = "producer", defaultValue = "30000")
@@ -46,7 +51,7 @@ public class DirectComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- DirectEndpoint endpoint = new DirectEndpoint(uri, this, consumers);
+ DirectEndpoint endpoint = new DirectEndpoint(uri, this);
endpoint.setBlock(block);
endpoint.setTimeout(timeout);
setProperties(endpoint, parameters);
@@ -54,8 +59,8 @@ public class DirectComponent extends DefaultComponent {
}
@Override
- protected void doStop() throws Exception {
- ServiceHelper.stopService(consumers);
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownService(consumers);
consumers.clear();
super.doStop();
}
@@ -82,4 +87,52 @@ public class DirectComponent extends DefaultComponent {
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+
+ int getStateCounter() {
+ return stateCounter;
+ }
+
+ public void addConsumer(String key, DirectConsumer consumer) {
+ synchronized (consumers) {
+ if (consumers.putIfAbsent(key, consumer) != null) {
+ throw new IllegalArgumentException(
+ "Cannot add a 2nd consumer to the same endpoint: " + key
+ + ". DirectEndpoint only allows one consumer.");
+ }
+ // state changed so inc counter
+ stateCounter++;
+ consumers.notifyAll();
+ }
+ }
+
+ public void removeConsumer(String key, DirectConsumer consumer) {
+ synchronized (consumers) {
+ consumers.remove(key, consumer);
+ // state changed so inc counter
+ stateCounter++;
+ consumers.notifyAll();
+ }
+ }
+
+ protected DirectConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
+ synchronized (consumers) {
+ DirectConsumer answer = consumers.get(key);
+ if (answer == null && block) {
+ StopWatch watch = new StopWatch();
+ for (;;) {
+ answer = consumers.get(key);
+ if (answer != null) {
+ break;
+ }
+ long rem = timeout - watch.taken();
+ if (rem <= 0) {
+ break;
+ }
+ consumers.wait(rem);
+ }
+ }
+ return answer;
+ }
+ }
+
}
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
index 45deae4..46d7de9 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectConsumer.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.direct;
-import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
@@ -28,11 +27,13 @@ import org.apache.camel.support.DefaultConsumer;
*/
public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Suspendable {
- private DirectEndpoint endpoint;
+ private final DirectComponent component;
+ private final String key;
- public DirectConsumer(Endpoint endpoint, Processor processor) {
+ public DirectConsumer(DirectEndpoint endpoint, Processor processor, String key) {
super(endpoint, processor);
- this.endpoint = (DirectEndpoint) endpoint;
+ this.component = (DirectComponent) endpoint.getComponent();
+ this.key = key;
}
@Override
@@ -43,24 +44,24 @@ public class DirectConsumer extends DefaultConsumer implements ShutdownAware, Su
@Override
protected void doStart() throws Exception {
super.doStart();
- endpoint.addConsumer(this);
+ component.addConsumer(key, this);
}
@Override
protected void doStop() throws Exception {
- endpoint.removeConsumer(this);
+ component.removeConsumer(key, this);
super.doStop();
}
@Override
protected void doSuspend() throws Exception {
- endpoint.removeConsumer(this);
+ component.removeConsumer(key, this);
}
@Override
protected void doResume() throws Exception {
// resume by using the start logic
- endpoint.addConsumer(this);
+ component.addConsumer(key, this);
}
@Override
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 7d5ac76..4281ec0 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -16,10 +16,7 @@
*/
package org.apache.camel.component.direct;
-import java.util.Map;
-
import org.apache.camel.Category;
-import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -28,7 +25,6 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
-import org.apache.camel.util.StopWatch;
import org.apache.camel.util.StringHelper;
/**
@@ -40,7 +36,7 @@ import org.apache.camel.util.StringHelper;
category = { Category.CORE, Category.ENDPOINT })
public class DirectEndpoint extends DefaultEndpoint {
- private final Map<String, DirectConsumer> consumers;
+ private final DirectComponent component;
private final String key;
@UriPath(description = "Name of direct endpoint")
@@ -54,9 +50,9 @@ public class DirectEndpoint extends DefaultEndpoint {
@UriParam(label = "producer", defaultValue = "true")
private boolean failIfNoConsumers = true;
- public DirectEndpoint(String uri, Component component, Map<String, DirectConsumer> consumers) {
+ public DirectEndpoint(String uri, DirectComponent component) {
super(uri, component);
- this.consumers = consumers;
+ this.component = component;
if (uri.indexOf('?') != -1) {
this.key = StringHelper.before(uri, "?");
} else {
@@ -66,52 +62,19 @@ public class DirectEndpoint extends DefaultEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new DirectProducer(this);
+ return new DirectProducer(this, key);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Consumer answer = new DirectConsumer(this, processor);
+ Consumer answer = new DirectConsumer(this, processor, key);
configureConsumer(answer);
return answer;
}
- public void addConsumer(DirectConsumer consumer) {
- synchronized (consumers) {
- if (consumers.putIfAbsent(key, consumer) != null) {
- throw new IllegalArgumentException(
- "Cannot add a 2nd consumer to the same endpoint. Endpoint " + this + " only allows one consumer.");
- }
- consumers.notifyAll();
- }
- }
-
- public void removeConsumer(DirectConsumer consumer) {
- synchronized (consumers) {
- consumers.remove(key, consumer);
- consumers.notifyAll();
- }
- }
-
- protected DirectConsumer getConsumer() throws InterruptedException {
- synchronized (consumers) {
- DirectConsumer answer = consumers.get(key);
- if (answer == null && block) {
- StopWatch watch = new StopWatch();
- for (;;) {
- answer = consumers.get(key);
- if (answer != null) {
- break;
- }
- long rem = timeout - watch.taken();
- if (rem <= 0) {
- break;
- }
- consumers.wait(rem);
- }
- }
- return answer;
- }
+ @Deprecated
+ public DirectConsumer getConsumer() throws InterruptedException {
+ return component.getConsumer(key, block, timeout);
}
public boolean isBlock() {
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
index 5c3e7f7..604563f 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectProducer.java
@@ -29,16 +29,30 @@ public class DirectProducer extends DefaultAsyncProducer {
private static final Logger LOG = LoggerFactory.getLogger(DirectProducer.class);
+ private volatile DirectConsumer consumer;
+ private int stateCounter;
+
private final DirectEndpoint endpoint;
+ private final DirectComponent component;
+ private final String key;
+ private final boolean block;
+ private final long timeout;
- public DirectProducer(DirectEndpoint endpoint) {
+ public DirectProducer(DirectEndpoint endpoint, String key) {
super(endpoint);
this.endpoint = endpoint;
+ this.component = (DirectComponent) endpoint.getComponent();
+ this.key = key;
+ this.block = endpoint.isBlock();
+ this.timeout = endpoint.getTimeout();
}
@Override
public void process(Exchange exchange) throws Exception {
- DirectConsumer consumer = endpoint.getConsumer();
+ if (consumer == null || stateCounter != component.getStateCounter()) {
+ stateCounter = component.getStateCounter();
+ consumer = component.getConsumer(key, block, timeout);
+ }
if (consumer == null) {
if (endpoint.isFailIfNoConsumers()) {
throw new DirectConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
@@ -53,7 +67,10 @@ public class DirectProducer extends DefaultAsyncProducer {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- DirectConsumer consumer = endpoint.getConsumer();
+ if (consumer == null || stateCounter != component.getStateCounter()) {
+ stateCounter = component.getStateCounter();
+ consumer = component.getConsumer(key, block, timeout);
+ }
if (consumer == null) {
if (endpoint.isFailIfNoConsumers()) {
exchange.setException(new DirectConsumerNotAvailableException(
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
index f79847b..cd54a2d 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/direct/DirectProducerBlockingTest.java
@@ -45,6 +45,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
= assertIsInstanceOf(DirectConsumerNotAvailableException.class, e.getCause());
assertIsInstanceOf(CamelExchangeException.class, cause);
assertTrue(watch.taken() > 490);
+ assertTrue(watch.taken() < 5000);
}
}
@@ -63,6 +64,7 @@ public class DirectProducerBlockingTest extends ContextTestSupport {
assertIsInstanceOf(CamelExchangeException.class, cause);
assertTrue(watch.taken() > 490);
+ assertTrue(watch.taken() < 5000);
}
}
diff --git a/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java b/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
index ad2e156..97c1b06 100644
--- a/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
+++ b/tests/camel-jmh/src/test/java/org/apache/camel/itest/jmh/DirectConcurrentTest.java
@@ -34,7 +34,6 @@ import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
-import org.openjdk.jmh.runner.options.TimeValue;
/**
* Tests a simple Camel route
@@ -48,12 +47,10 @@ public class DirectConcurrentTest {
// You can be more specific if you'd like to run only one benchmark per test.
.include(this.getClass().getName() + ".*")
// Set the following options as needed
- .mode(Mode.All)
- .timeUnit(TimeUnit.MICROSECONDS)
- .warmupTime(TimeValue.seconds(5))
- .warmupIterations(0)
- .measurementTime(TimeValue.seconds(30))
- .measurementIterations(1)
+ .mode(Mode.AverageTime)
+ .timeUnit(TimeUnit.MILLISECONDS)
+ .warmupIterations(1)
+ .measurementIterations(5)
.threads(4)
.forks(1)
.shouldFailOnError(true)
@@ -115,7 +112,9 @@ public class DirectConcurrentTest {
@Benchmark
public void directConcurrentTest(BenchmarkState state, Blackhole bh) {
ProducerTemplate template = state.producer;
- template.sendBody("direct:start", "Hello World");
+ for (int i = 0; i < 50000; i++) {
+ template.sendBody("direct:start", "Hello " + i);
+ }
}
}