You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/15 10:45:56 UTC
[camel] 01/04: CAMEL-15176: Optimize component to do as much in
init phase vs start phase.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e14e44580a823445d6f67b18a957ac57874a8543
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Oct 15 09:59:21 2020 +0200
CAMEL-15176: Optimize component to do as much in init phase vs start phase.
---
.../camel/component/direct/DirectEndpoint.java | 14 ++---
.../camel/component/directvm/DirectVmProducer.java | 73 +++++++++++++---------
components/components-init-work-in-progress.md | 12 ++--
3 files changed, 56 insertions(+), 43 deletions(-)
diff --git a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
index 941ce31..4b4e1fd 100644
--- a/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
+++ b/components/camel-direct/src/main/java/org/apache/camel/component/direct/DirectEndpoint.java
@@ -42,6 +42,7 @@ import org.apache.camel.util.StringHelper;
public class DirectEndpoint extends DefaultEndpoint {
private final Map<String, DirectConsumer> consumers;
+ private String key;
@UriPath(description = "Name of direct endpoint")
@Metadata(required = true)
@@ -68,6 +69,11 @@ public class DirectEndpoint extends DefaultEndpoint {
}
@Override
+ protected void doInit() throws Exception {
+ key = initKey();
+ }
+
+ @Override
public Producer createProducer() throws Exception {
return new DirectProducer(this);
}
@@ -80,7 +86,6 @@ public class DirectEndpoint extends DefaultEndpoint {
}
public void addConsumer(DirectConsumer consumer) {
- String key = getKey();
synchronized (consumers) {
if (consumers.putIfAbsent(key, consumer) != null) {
throw new IllegalArgumentException(
@@ -91,7 +96,6 @@ public class DirectEndpoint extends DefaultEndpoint {
}
public void removeConsumer(DirectConsumer consumer) {
- String key = getKey();
synchronized (consumers) {
consumers.remove(key, consumer);
consumers.notifyAll();
@@ -99,7 +103,6 @@ public class DirectEndpoint extends DefaultEndpoint {
}
protected DirectConsumer getConsumer() throws InterruptedException {
- String key = getKey();
synchronized (consumers) {
DirectConsumer answer = consumers.get(key);
if (answer == null && block) {
@@ -116,9 +119,6 @@ public class DirectEndpoint extends DefaultEndpoint {
consumers.wait(rem);
}
}
- // if (answer != null && answer.getEndpoint() != this) {
- // throw new IllegalStateException();
- // }
return answer;
}
}
@@ -160,7 +160,7 @@ public class DirectEndpoint extends DefaultEndpoint {
this.failIfNoConsumers = failIfNoConsumers;
}
- protected String getKey() {
+ protected String initKey() {
String uri = getEndpointUri();
if (uri.indexOf('?') != -1) {
return StringHelper.before(uri, "?");
diff --git a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
index 1eed7e1..568eef7 100644
--- a/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
+++ b/components/camel-directvm/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
@@ -54,43 +54,56 @@ public class DirectVmProducer extends DefaultAsyncProducer {
return true;
}
- final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
+ try {
+ final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
- // Only clone the Exchange if we actually need to filter out properties or headers.
- final Exchange submitted
- = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange;
+ // Only clone the Exchange if we actually need to filter out properties or headers.
+ final Exchange submitted
+ = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy() : exchange;
- // Clear properties in the copy if we are not propagating them.
- if (!endpoint.isPropagateProperties()) {
- submitted.getProperties().clear();
- }
-
- // Filter headers by Header Filter Strategy if there is one set.
- if (headerFilterStrategy != null) {
- submitted.getIn().getHeaders().entrySet()
- .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
- }
-
- return consumer.getAsyncProcessor().process(submitted, done -> {
- Message msg = submitted.getMessage();
-
- if (headerFilterStrategy != null) {
- msg.getHeaders().entrySet()
- .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted));
+ // Clear properties in the copy if we are not propagating them.
+ if (!endpoint.isPropagateProperties()) {
+ submitted.getProperties().clear();
}
- if (exchange != submitted) {
- // only need to copy back if they are different
- exchange.setException(submitted.getException());
- exchange.getOut().copyFrom(msg);
+ // Filter headers by Header Filter Strategy if there is one set.
+ if (headerFilterStrategy != null) {
+ submitted.getIn().getHeaders().entrySet()
+ .removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
}
- if (endpoint.isPropagateProperties()) {
- exchange.getProperties().putAll(submitted.getProperties());
- }
+ return consumer.getAsyncProcessor().process(submitted, done -> {
+ try {
+ Message msg = submitted.getMessage();
+
+ if (headerFilterStrategy != null) {
+ msg.getHeaders().entrySet()
+ .removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(),
+ submitted));
+ }
+
+ if (exchange != submitted) {
+ // only need to copy back if they are different
+ exchange.setException(submitted.getException());
+ exchange.getOut().copyFrom(msg);
+ }
+
+ if (endpoint.isPropagateProperties()) {
+ exchange.getProperties().putAll(submitted.getProperties());
+ }
+ } catch (Throwable e) {
+ exchange.setException(e);
+ } finally {
+ callback.done(done);
+ }
+ });
+
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
- callback.done(done);
- });
+ callback.done(true);
+ return true;
}
}
diff --git a/components/components-init-work-in-progress.md b/components/components-init-work-in-progress.md
index 487cdab..740dac6 100644
--- a/components/components-init-work-in-progress.md
+++ b/components/components-init-work-in-progress.md
@@ -90,13 +90,13 @@
|camel-controlbus|DONE | |
|camel-corda|REJECT|open network connection|
|camel-couchbase|DONE | |
-|camel-couchdb| | |
+|camel-couchdb|DONE | |
|camel-cron|DONE | |
-|camel-crypto| | |
-|camel-crypto-cms| | |
+|camel-crypto|DONE | |
+|camel-crypto-cms|REJECT | |
|camel-csv|DONE| |
|camel-cxf|REJECT|start a server|
-|camel-cxf-transport| | |
+|camel-cxf-transport|REJECT | |
|camel-dataformat|DONE| |
|camel-dataset|DONE| |
|camel-debezium| | |
@@ -106,8 +106,8 @@
|camel-debezium-postgres| | |
|camel-debezium-sqlserver| | |
|camel-digitalocean|REJECT|create a http client|
-|camel-direct| | |
-|camel-directvm| | |
+|camel-direct|DONE | |
+|camel-directvm|DONE | |
|camel-disruptor|REJECT|start a thread|
|camel-djl| | |
|camel-dns| | |