You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/05 08:57:04 UTC

[camel] 01/04: CAMEL-16450: camel-core - Optimize WireTap only use dynamic send processor when being dynamic.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8db83a9b2250a817df69fb7c43adf19f4c2cea77
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 5 09:38:46 2021 +0200

    CAMEL-16450: camel-core - Optimize WireTap only use dynamic send processor when being dynamic.
---
 .../org/apache/camel/processor/SendProcessor.java  |  2 +-
 .../apache/camel/processor/WireTapProcessor.java   | 27 ++++++++++++-----
 .../java/org/apache/camel/reifier/SendReifier.java |  2 +-
 .../org/apache/camel/reifier/WireTapReifier.java   | 35 ++++++++++++++++++----
 4 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
index e76b002..f156ddd 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -70,10 +70,10 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
         ObjectHelper.notNull(destination, "destination");
         this.destination = destination;
         this.camelContext = (ExtendedCamelContext) destination.getCamelContext();
+        ObjectHelper.notNull(this.camelContext, "camelContext");
         this.pattern = pattern;
         this.destinationExchangePattern = null;
         this.destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(destination.getEndpointUri());
-        ObjectHelper.notNull(this.camelContext, "camelContext");
     }
 
     @Override
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 00111ba..e72526d 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -61,7 +61,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private String id;
     private String routeId;
     private CamelContext camelContext;
-    private final SendDynamicProcessor dynamicProcessor;
+    private final SendDynamicProcessor dynamicSendProcessor; // is only used for reporting statistics
     private final String uri;
     private final boolean dynamicUri;
     private final Processor processor;
@@ -78,10 +78,11 @@ public class WireTapProcessor extends AsyncProcessorSupport
     private boolean copy;
     private Processor onPrepare;
 
-    public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern,
+    public WireTapProcessor(SendDynamicProcessor dynamicSendProcessor, Processor processor, String uri,
+                            ExchangePattern exchangePattern,
                             ExecutorService executorService, boolean shutdownExecutorService, boolean dynamicUri) {
-        this.dynamicProcessor = dynamicProcessor;
-        this.uri = dynamicProcessor.getUri();
+        this.dynamicSendProcessor = dynamicSendProcessor;
+        this.uri = uri;
         this.processor = processor;
         this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
         this.exchangePattern = exchangePattern;
@@ -148,7 +149,11 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
-        return dynamicProcessor.getEndpointUtilizationStatistics();
+        if (dynamicSendProcessor != null) {
+            return dynamicSendProcessor.getEndpointUtilizationStatistics();
+        } else {
+            return null;
+        }
     }
 
     @Override
@@ -305,11 +310,19 @@ public class WireTapProcessor extends AsyncProcessorSupport
     }
 
     public int getCacheSize() {
-        return dynamicProcessor.getCacheSize();
+        if (dynamicSendProcessor != null) {
+            return dynamicSendProcessor.getCacheSize();
+        } else {
+            return 0;
+        }
     }
 
     public boolean isIgnoreInvalidEndpoint() {
-        return dynamicProcessor.isIgnoreInvalidEndpoint();
+        if (dynamicSendProcessor != null) {
+            return dynamicSendProcessor.isIgnoreInvalidEndpoint();
+        } else {
+            return false;
+        }
     }
 
     public boolean isDynamicUri() {
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
index c8cd1e0..f5a3051 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/SendReifier.java
@@ -40,7 +40,7 @@ public class SendReifier extends ProcessorReifier<SendDefinition<?>> {
     public Endpoint resolveEndpoint() {
         if (definition.getEndpoint() == null) {
             if (definition.getEndpointProducerBuilder() == null) {
-                return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), (String) null);
+                return CamelContextHelper.resolveEndpoint(camelContext, definition.getEndpointUri(), null);
             } else {
                 return definition.getEndpointProducerBuilder().resolve(camelContext);
             }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
index 79c1d55..3d0c214 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java
@@ -19,6 +19,7 @@ package org.apache.camel.reifier;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.ExtendedCamelContext;
@@ -28,7 +29,11 @@ import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.SetHeaderDefinition;
 import org.apache.camel.model.WireTapDefinition;
 import org.apache.camel.processor.SendDynamicProcessor;
+import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.WireTapProcessor;
+import org.apache.camel.support.CamelContextHelper;
+import org.apache.camel.support.LanguageSupport;
+import org.apache.camel.util.StringHelper;
 
 public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
 
@@ -45,11 +50,30 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         // must use InOnly for WireTap
         definition.setPattern(ExchangePattern.InOnly.name());
 
-        // create the send dynamic producer to send to the wire tapped endpoint
-        SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor();
+        // optimize to only use dynamic processor if really needed
+        String uri;
+        if (definition.getEndpointProducerBuilder() != null) {
+            uri = definition.getEndpointProducerBuilder().getUri();
+        } else {
+            uri = StringHelper.notEmpty(definition.getUri(), "uri", this);
+        }
+
+        SendDynamicProcessor dynamicSendProcessor = null;
+        SendProcessor sendProcessor = null;
+        boolean simple = LanguageSupport.hasSimpleFunction(definition.getUri());
+        boolean dynamic = parseBoolean(definition.getDynamicUri(), true);
+        if (dynamic && simple) {
+            // dynamic so we need the dynamic send processor
+            dynamicSendProcessor = (SendDynamicProcessor) super.createProcessor();
+        } else {
+            // static so we can use a plain send processor
+            Endpoint endpoint = CamelContextHelper.resolveEndpoint(camelContext, uri, null);
+            sendProcessor = new SendProcessor(endpoint);
+        }
 
         // create error handler we need to use for processing the wire tapped
-        Processor childProcessor = wrapInErrorHandler(dynamicTo);
+        Processor producer = dynamicSendProcessor != null ? dynamicSendProcessor : sendProcessor;
+        Processor childProcessor = wrapInErrorHandler(producer);
 
         // and wrap in unit of work
         AsyncProcessor target = camelContext.adapt(ExtendedCamelContext.class).getInternalProcessorFactory()
@@ -59,10 +83,9 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> {
         boolean isCopy = parseBoolean(definition.getCopy(), true);
 
         WireTapProcessor answer = new WireTapProcessor(
-                dynamicTo, target,
+                dynamicSendProcessor, target, uri,
                 parse(ExchangePattern.class, definition.getPattern()),
-                threadPool, shutdownThreadPool,
-                parseBoolean(definition.getDynamicUri(), true));
+                threadPool, shutdownThreadPool, dynamic);
         answer.setCopy(isCopy);
         Processor newExchangeProcessor = definition.getNewExchangeProcessor();
         String ref = parseString(definition.getNewExchangeProcessorRef());