You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/05 08:57:04 UTC
[camel] 01/04: CAMEL-16450: camel-core - Optimize WireTap only use
dynamic send processor when being dynamic.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8db83a9b2250a817df69fb7c43adf19f4c2cea77
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 09:38:46 2021 +0200
CAMEL-16450: camel-core - Optimize WireTap only use dynamic send processor when being dynamic.
---
.../org/apache/camel/processor/SendProcessor.java | 2 +-
.../apache/camel/processor/WireTapProcessor.java | 27 ++++++++++++-----
.../java/org/apache/camel/reifier/SendReifier.java | 2 +-
.../org/apache/camel/reifier/WireTapReifier.java | 35 ++++++++++++++++++----
4 files changed, 51 insertions(+), 15 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index e76b002..f156ddd 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -70,10 +70,10 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
ObjectHelper.notNull(destination, "destination");
this.destination = destination;
this.camelContext = (ExtendedCamelContext) destination.getCamelContext();
+ ObjectHelper.notNull(this.camelContext, "camelContext");
this.pattern = pattern;
this.destinationExchangePattern = null;
this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri());
- ObjectHelper.notNull(this.camelContext, "camelContext");
}
@Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 00111ba..e72526d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -61,7 +61,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
private String id;
private String routeId;
private CamelContext camelContext;
- private final SendDynamicProcessor dynamicProcessor;
+ private final SendDynamicProcessor dynamicSendProcessor; // is only used for reporting statistics
private final String uri;
private final boolean dynamicUri;
private final Processor processor;
@@ -78,10 +78,11 @@ public class WireTapProcessor extends AsyncProcessorSupport
private boolean copy;
private Processor onPrepare;
- public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern,
+ public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri,
+ ExchangePattern exchangePattern,
ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) {
- this.dynamicProcessor = dynamicProcessor;
- this.uri = dynamicProcessor.getUri();
+ this.dynamicSendProcessor = dynamicSendProcessor;
+ this.uri = uri;
this.processor = processor;
this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
this.exchangePattern = exchangePattern;
@@ -148,7 +149,11 @@ public class WireTapProcessor extends AsyncProcessorSupport
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
- return dynamicProcessor.getEndpointUtilizationStatistics();
+ if (dynamicSendProcessor != null) {
+ return dynamicSendProcessor.getEndpointUtilizationStatistics();
+ } else {
+ return null;
+ }
}
@Override
@@ -305,11 +310,19 @@ public class WireTapProcessor extends AsyncProcessorSupport
}
public int getCacheSize() {
- return dynamicProcessor.getCacheSize();
+ if (dynamicSendProcessor != null) {
+ return dynamicSendProcessor.getCacheSize();
+ } else {
+ return 0;
+ }
}
public boolean isIgnoreInvalidEndpoint() {
- return dynamicProcessor.isIgnoreInvalidEndpoint();
+ if (dynamicSendProcessor != null) {
+ return dynamicSendProcessor.isIgnoreInvalidEndpoint();
+ } else {
+ return false;
+ }
}
public boolean isDynamicUri() {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
index c8cd1e0..f5a3051 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
@@ -40,7 +40,7 @@ public class SendReifier extends ProcessorReifier<SendDefinition<?>> {
public Endpoint resolveEndpoint() {
if (definition.getEndpoint() == null) {
if (definition.getEndpointProducerBuilder() == null) {
- return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), (String) null);
+ return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), null);
} else {
return definition.getEndpointProducerBuilder().resolve(camelContext);
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 79c1d55..3d0c214 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -19,6 +19,7 @@ package org.apache.camel.reifier;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
@@ -28,7 +29,11 @@ import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.SetHeaderDefinition;
import org.apache.camel.model.WireTapDefinition;
import org.apache.camel.processor.SendDynamicProcessor;
+import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.WireTapProcessor;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.LanguageSupport;
+import org.apache.camel.util.StringHelper;
public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
@@ -45,11 +50,30 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
// must use InOnly for WireTap
definition.setPattern(ExchangePattern.InOnly.name());
- // create the send dynamic producer to send to the wire tapped endpoint
- SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor();
+ // optimize to only use dynamic processor if really needed
+ String uri;
+ if (definition.getEndpointProducerBuilder() != null) {
+ uri = definition.getEndpointProducerBuilder().getUri();
+ } else {
+ uri = StringHelper.notEmpty(definition.getUri(), "uri", this);
+ }
+
+ SendDynamicProcessor dynamicSendProcessor = null;
+ SendProcessor sendProcessor = null;
+ boolean simple = LanguageSupport.hasSimpleFunction(definition.getUri());
+ boolean dynamic = parseBoolean(definition.getDynamicUri(), true);
+ if (dynamic && simple) {
+ // dynamic so we need the dynamic send processor
+ dynamicSendProcessor = (SendDynamicProcessor) super.createProcessor();
+ } else {
+ // static so we can use a plain send processor
+ Endpoint endpoint = CamelContextHelper.resolveEndpoint(camelContext, uri, null);
+ sendProcessor = new SendProcessor(endpoint);
+ }
// create error handler we need to use for processing the wire tapped
- Processor childProcessor = wrapInErrorHandler(dynamicTo);
+ Processor producer = dynamicSendProcessor != null ? dynamicSendProcessor : sendProcessor;
+ Processor childProcessor = wrapInErrorHandler(producer);
// and wrap in unit of work
AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
@@ -59,10 +83,9 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
boolean isCopy = parseBoolean(definition.getCopy(), true);
WireTapProcessor answer = new WireTapProcessor(
- dynamicTo, target,
+ dynamicSendProcessor, target, uri,
parse(ExchangePattern.class, definition.getPattern()),
- threadPool, shutdownThreadPool,
- parseBoolean(definition.getDynamicUri(), true));
+ threadPool, shutdownThreadPool, dynamic);
answer.setCopy(isCopy);
Processor newExchangeProcessor = definition.getNewExchangeProcessor();
String ref = parseString(definition.getNewExchangeProcessorRef());