You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/20 22:47:23 UTC
[2/7] camel git commit: CAMEL-8965: WireTap supports dynamic uris
like toD does
CAMEL-8965: WireTap supports dynamic uris like toD does
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cc556718
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cc556718
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cc556718
Branch: refs/heads/master
Commit: cc556718cf3467cc2475bcda44c16fb7ce19149f
Parents: 4376cb3
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 20 21:38:42 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 20 22:54:08 2015 +0200
----------------------------------------------------------------------
.../management/mbean/ManagedWireTapMBean.java | 4 +-
.../mbean/ManagedWireTapProcessor.java | 6 +-
.../apache/camel/model/ProcessorDefinition.java | 102 ++++++++-----------
.../apache/camel/model/ToDynamicDefinition.java | 24 +++--
.../apache/camel/model/WireTapDefinition.java | 81 ++++++---------
.../camel/processor/WireTapProcessor.java | 37 +++----
.../camel/management/ManagedWireTapTest.java | 6 +-
.../camel/processor/CBRWithWireTapTest.java | 4 +-
.../camel/processor/WireTapCustomPool2Test.java | 2 +-
.../camel/processor/WireTapCustomPoolTest.java | 2 +-
.../camel/processor/WireTapExpressionTest.java | 2 +-
.../camel/processor/WireTapNewExchangeTest.java | 2 +-
.../processor/WireTapOnPrepareRefTest.java | 2 +-
.../src/test/resources/log4j.properties | 6 +-
.../FileWireTapWithXMLPayloadIssueTest.xml | 4 +-
.../SpringAggregateFromWireTapTest.xml | 4 +-
.../processor/SpringComplexBlockWithEndTest.xml | 4 +-
.../processor/SpringWireTapExpressionTest.xml | 4 +-
.../processor/SpringWireTapNewExchangeTest.xml | 3 +-
.../spring/processor/SpringWireTapTest.xml | 8 +-
.../SpringWireTapUsingFireAndForgetCopyTest.xml | 7 +-
.../SpringWireTapUsingFireAndForgetTest.xml | 7 +-
.../spring/processor/WireTapOnPrepareTest.xml | 4 +-
23 files changed, 135 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
index 52663cc..ac60c49 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -20,8 +20,8 @@ import org.apache.camel.api.management.ManagedAttribute;
public interface ManagedWireTapMBean extends ManagedProcessorMBean {
- @ManagedAttribute(description = "Expression that returns the uri to use for the wire tap destination", mask = true)
- String getExpression();
+ @ManagedAttribute(description = "The uri of the endpoint to wiretap to. The uri can be dynamic computed using the expressions.", mask = true)
+ String getUri();
@ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
Integer getCacheSize();
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
index 60471fd..88b72ad 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -41,9 +41,9 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed
super.init(strategy);
boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
if (sanitize) {
- uri = URISupport.sanitizeUri(processor.getExpression().toString());
+ uri = URISupport.sanitizeUri(processor.getUri());
} else {
- uri = processor.getExpression().toString();
+ uri = processor.getUri();
}
}
@@ -51,7 +51,7 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed
return processor;
}
- public String getExpression() {
+ public String getUri() {
return uri;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 57bf273..fe6f12b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2256,44 +2256,14 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @return the builder
*/
- public ExpressionClause<WireTapDefinition> wireTap() {
+ public WireTapDefinition<Type> wireTap(String uri) {
WireTapDefinition answer = new WireTapDefinition();
+ answer.setUri(uri);
addOutput(answer);
- return ExpressionClause.createAndSetExpression(answer);
- }
-
- /**
- * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
- * Sends messages to all its child outputs; so that each processor and
- * destination gets a copy of the original message to avoid the processors
- * interfering with each other using {@link ExchangePattern#InOnly}.
- *
- * @param expression the expression to compute the uri to use as wire tap
- * @return the builder
- */
- public Type wireTap(Expression expression) {
- WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new ExpressionDefinition(expression));
- addOutput(answer);
- return (Type) this;
- }
-
- /**
- * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
- * Sends messages to all its child outputs; so that each processor and
- * destination gets a copy of the original message to avoid the processors
- * interfering with each other using {@link ExchangePattern#InOnly}.
- *
- * @param uri the destination
- * @return the builder
- */
- public Type wireTap(String uri) {
- WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
- addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
@@ -2302,17 +2272,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param executorService a custom {@link ExecutorService} to use as thread pool
* for sending tapped exchanges
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, ExecutorService executorService) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, ExecutorService executorService) {
WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
+ answer.setUri(uri);
answer.setExecutorService(executorService);
addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
@@ -2321,17 +2293,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* destination gets a copy of the original message to avoid the processors
* interfering with each other using {@link ExchangePattern#InOnly}.
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param executorServiceRef reference to lookup a custom {@link ExecutorService}
* to use as thread pool for sending tapped exchanges
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, String executorServiceRef) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, String executorServiceRef) {
WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
+ answer.setUri(uri);
answer.setExecutorServiceRef(executorServiceRef);
addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
@@ -2342,11 +2316,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Will use a copy of the original Exchange which is passed in as argument
* to the given expression
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param body expression that creates the body to send
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, Expression body) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, Expression body) {
return wireTap(uri, true, body);
}
@@ -2355,16 +2331,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, boolean copy) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, boolean copy) {
WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
+ answer.setUri(uri);
answer.setCopy(copy);
addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
@@ -2372,18 +2350,20 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @param body expression that creates the body to send
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, boolean copy, Expression body) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, boolean copy, Expression body) {
WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
+ answer.setUri(uri);
answer.setCopy(copy);
- answer.setNewExchangeExpression(body);
+ answer.setNewExchangeExpression(new ExpressionSubElementDefinition(body));
addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
@@ -2394,11 +2374,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Will use a copy of the original Exchange which is passed in as argument
* to the given processor
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param processor processor preparing the new exchange to send
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, Processor processor) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, Processor processor) {
return wireTap(uri, true, processor);
}
@@ -2407,18 +2389,20 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* Sends a new {@link org.apache.camel.Exchange} to the destination
* using {@link ExchangePattern#InOnly}.
*
- * @param uri the destination
+ * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default)
* @param copy whether or not use a copy of the original exchange or a new empty exchange
* @param processor processor preparing the new exchange to send
* @return the builder
+ * @deprecated use the fluent builder from {@link WireTapDefinition}, will be removed in Camel 3.0
*/
- public Type wireTap(String uri, boolean copy, Processor processor) {
+ @Deprecated
+ public WireTapDefinition<Type> wireTap(String uri, boolean copy, Processor processor) {
WireTapDefinition answer = new WireTapDefinition();
- answer.setExpression(new SimpleExpression(uri));
+ answer.setUri(uri);
answer.setCopy(copy);
answer.setNewExchangeProcessor(processor);
addOutput(answer);
- return (Type) this;
+ return answer;
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index 43db47b..7f5483b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -67,6 +67,20 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition>
public Processor createProcessor(RouteContext routeContext) throws Exception {
ObjectHelper.notEmpty(uri, "uri", this);
+ Expression exp = createExpression(routeContext);
+
+ SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp);
+ processor.setPattern(pattern);
+ if (cacheSize != null) {
+ processor.setCacheSize(cacheSize);
+ }
+ if (ignoreInvalidEndpoint != null) {
+ processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+ }
+ return processor;
+ }
+
+ protected Expression createExpression(RouteContext routeContext) {
List<Expression> list = new ArrayList<Expression>();
String[] parts = uri.split("\\+");
for (String part : parts) {
@@ -97,15 +111,7 @@ public class ToDynamicDefinition extends NoOutputDefinition<ToDynamicDefinition>
exp = ExpressionBuilder.concatExpression(list);
}
- SendDynamicProcessor processor = new SendDynamicProcessor(uri, exp);
- processor.setPattern(pattern);
- if (cacheSize != null) {
- processor.setCacheSize(cacheSize);
- }
- if (ignoreInvalidEndpoint != null) {
- processor.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
- }
- return processor;
+ return exp;
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 22c4941..4cf47a1 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -44,7 +44,7 @@ import org.apache.camel.util.CamelContextHelper;
@Metadata(label = "eip,endpoint,routing")
@XmlRootElement(name = "wireTap")
@XmlAccessorType(XmlAccessType.FIELD)
-public class WireTapDefinition extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<WireTapDefinition> {
+public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends ToDynamicDefinition implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>> {
@XmlTransient
private Processor newExchangeProcessor;
@XmlAttribute(name = "processorRef")
@@ -60,13 +60,9 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean copy;
@XmlAttribute
- private Integer cacheSize;
- @XmlAttribute
private String onPrepareRef;
@XmlTransient
private Processor onPrepare;
- @XmlAttribute
- private Boolean ignoreInvalidEndpoint;
public WireTapDefinition() {
}
@@ -78,14 +74,7 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "WireTap", this, true);
// create the send dynamic producer to send to the wire tapped endpoint
- SendDynamicProcessor dynamicTo = new SendDynamicProcessor(getExpression());
- dynamicTo.setCamelContext(routeContext.getCamelContext());
- if (cacheSize != null) {
- dynamicTo.setCacheSize(cacheSize);
- }
- if (ignoreInvalidEndpoint != null) {
- dynamicTo.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
- }
+ SendDynamicProcessor dynamicTo = (SendDynamicProcessor) super.createProcessor(routeContext);
// create error handler we need to use for processing the wire tapped
Processor target = wrapInErrorHandler(routeContext, dynamicTo);
@@ -97,7 +86,7 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
// is true bt default
boolean isCopy = getCopy() == null || getCopy();
- WireTapProcessor answer = new WireTapProcessor(getExpression(), internal, getPattern(), threadPool, shutdownThreadPool);
+ WireTapProcessor answer = new WireTapProcessor(dynamicTo, internal, getPattern(), threadPool, shutdownThreadPool);
answer.setCopy(isCopy);
if (newExchangeProcessorRef != null) {
newExchangeProcessor = routeContext.mandatoryLookup(newExchangeProcessorRef, Processor.class);
@@ -120,12 +109,6 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
if (onPrepare != null) {
answer.setOnPrepare(onPrepare);
}
- if (cacheSize != null) {
- answer.setCacheSize(cacheSize);
- }
- if (ignoreInvalidEndpoint != null) {
- answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
- }
return answer;
}
@@ -136,12 +119,25 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
@Override
public String toString() {
- return "WireTap[" + getExpression() + "]";
+ return "WireTap[" + getUri() + "]";
}
@Override
public String getLabel() {
- return "wireTap[" + getExpression() + "]";
+ return "wireTap[" + getUri() + "]";
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Type end() {
+ // allow end() to return to previous type so you can continue in the DSL
+ return (Type) super.end();
+ }
+
+ @Override
+ public void addOutput(ProcessorDefinition<?> output) {
+ // add outputs on parent as this wiretap does not support outputs
+ getParent().addOutput(output);
}
// Fluent API
@@ -209,7 +205,7 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
* @see #newExchangeHeader(String, org.apache.camel.Expression)
*/
public WireTapDefinition newExchangeBody(Expression expression) {
- setNewExchangeExpression(expression);
+ setNewExchangeExpression(new ExpressionSubElementDefinition(expression));
return this;
}
@@ -285,6 +281,7 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
* @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
* @return the builder
*/
+ @Override
public WireTapDefinition cacheSize(int cacheSize) {
setCacheSize(cacheSize);
return this;
@@ -295,6 +292,7 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
*
* @return the builder
*/
+ @Override
public WireTapDefinition ignoreInvalidEndpoint() {
setIgnoreInvalidEndpoint(true);
return this;
@@ -303,13 +301,17 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
// Properties
//-------------------------------------------------------------------------
+ @Override
+ public String getUri() {
+ return super.getUri();
+ }
+
/**
- * Expression that returns the uri to use for the wire tap destination
+ * The uri of the endpoint to wiretap to. The uri can be dynamic computed using the {@link org.apache.camel.language.simple.SimpleLanguage} expression.
*/
@Override
- public void setExpression(ExpressionDefinition expression) {
- // override to include javadoc what the expression is used for
- super.setExpression(expression);
+ public void setUri(String uri) {
+ super.setUri(uri);
}
public Processor getNewExchangeProcessor() {
@@ -339,14 +341,10 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
}
/**
- * Expression used for creating a new body as the message to use for wire tapping
+ * Uses the expression for creating a new body as the message to use for wire tapping
*/
- public void setNewExchangeExpression(ExpressionSubElementDefinition expression) {
- this.newExchangeExpression = expression;
- }
-
- public void setNewExchangeExpression(Expression expression) {
- this.newExchangeExpression = new ExpressionSubElementDefinition(expression);
+ public void setNewExchangeExpression(ExpressionSubElementDefinition newExchangeExpression) {
+ this.newExchangeExpression = newExchangeExpression;
}
public ExecutorService getExecutorService() {
@@ -397,19 +395,4 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
this.headers = headers;
}
- public Integer getCacheSize() {
- return cacheSize;
- }
-
- public void setCacheSize(Integer cacheSize) {
- this.cacheSize = cacheSize;
- }
-
- public Boolean getIgnoreInvalidEndpoint() {
- return ignoreInvalidEndpoint;
- }
-
- public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) {
- this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
- }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index fc09ea8..bde5fca 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -52,14 +52,12 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class);
private String id;
private CamelContext camelContext;
- private final Expression expression;
+ private final SendDynamicProcessor dynamicProcessor;
+ private final String uri;
private final Processor processor;
private final ExchangePattern exchangePattern;
private final ExecutorService executorService;
private volatile boolean shutdownExecutorService;
- // only used for management to be able to report the setting
- private int cacheSize;
- private boolean ignoreInvalidEndpoint;
// expression or processor used for populating a new exchange to send
// as opposed to traditional wiretap that sends a copy of the original exchange
@@ -68,9 +66,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
private boolean copy;
private Processor onPrepare;
- public WireTapProcessor(Expression expression, Processor processor, ExchangePattern exchangePattern,
+ public WireTapProcessor(SendDynamicProcessor dynamicProcessor, Processor processor, ExchangePattern exchangePattern,
ExecutorService executorService, boolean shutdownExecutorService) {
- this.expression = expression;
+ this.dynamicProcessor = dynamicProcessor;
+ this.uri = dynamicProcessor.getUri();
this.processor = processor;
this.exchangePattern = exchangePattern;
ObjectHelper.notNull(executorService, "executorService");
@@ -80,12 +79,12 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
@Override
public String toString() {
- return "WireTap[" + expression + "]";
+ return "WireTap[" + uri + "]";
}
@Override
public String getTraceLabel() {
- return "wireTap(" + expression + ")";
+ return "wireTap(" + uri + ")";
}
public String getId() {
@@ -129,10 +128,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
executorService.submit(new Callable<Exchange>() {
public Exchange call() throws Exception {
try {
- LOG.debug(">>>> (wiretap) {} {}", expression, wireTapExchange);
+ LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
processor.process(wireTapExchange);
} catch (Throwable e) {
- LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + expression + ". This exception will be ignored.", e);
+ LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e);
}
return wireTapExchange;
}
@@ -207,10 +206,6 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
}
- public Expression getExpression() {
- return expression;
- }
-
public List<Processor> getNewExchangeProcessors() {
return newExchangeProcessors;
}
@@ -250,20 +245,16 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
this.onPrepare = onPrepare;
}
- public int getCacheSize() {
- return cacheSize;
+ public String getUri() {
+ return uri;
}
- public void setCacheSize(int cacheSize) {
- this.cacheSize = cacheSize;
+ public int getCacheSize() {
+ return dynamicProcessor.getCacheSize();
}
public boolean isIgnoreInvalidEndpoint() {
- return ignoreInvalidEndpoint;
- }
-
- public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
- this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
+ return dynamicProcessor.isIgnoreInvalidEndpoint();
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
index 31e0a37..4638037 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
@@ -58,8 +58,8 @@ public class ManagedWireTapTest extends ManagementTestSupport {
String state = (String) mbeanServer.getAttribute(on, "State");
assertEquals(ServiceStatus.Started.name(), state);
- String uri = (String) mbeanServer.getAttribute(on, "Expression");
- assertEquals("simple{direct:${header.whereto}}", uri);
+ String uri = (String) mbeanServer.getAttribute(on, "Uri");
+ assertEquals("direct:${header.whereto}", uri);
TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
assertNotNull(data);
@@ -72,6 +72,8 @@ public class ManagedWireTapTest extends ManagementTestSupport {
String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
assertNotNull(json);
assertTrue(json.contains("\"description\": \"Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message"));
+ assertTrue(json.contains(" \"uri\": { \"kind\": \"attribute\", \"required\": \"true\", \"type\": \"string\", \"javaType\": \"java.lang.String\","
+ + " \"deprecated\": \"false\", \"value\": \"direct:${header.whereto}\""));
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
index 578a8ed..eadb08a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/CBRWithWireTapTest.java
@@ -62,9 +62,9 @@ public class CBRWithWireTapTest extends ContextTestSupport {
from("direct:start")
.choice()
.when(body().contains("Camel"))
- .wireTap("mock:camel")
+ .wireTap("mock:camel").end()
.when(body().contains("Donkey"))
- .wireTap("mock:donkey", true)
+ .wireTap("mock:donkey").end()
.otherwise()
.to("mock:other");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
index 7f607e1..a17836e 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPool2Test.java
@@ -67,7 +67,7 @@ public class WireTapCustomPool2Test extends ContextTestSupport {
from("direct:start")
.to("log:foo")
// pass in the custom pool to the wireTap DSL
- .wireTap().constant("direct:tap").executorService(pool)
+ .wireTap("direct:tap").executorService(pool)
.to("mock:result");
// END SNIPPET: e1
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
index fee3d62..d69a273 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapCustomPoolTest.java
@@ -68,7 +68,7 @@ public class WireTapCustomPoolTest extends ContextTestSupport {
from("direct:start")
.to("log:foo")
// pass in the custom pool to the wireTap DSL
- .wireTap("direct:tap", pool)
+ .wireTap("direct:tap").executorService(pool)
.to("mock:result");
// END SNIPPET: e1
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/WireTapExpressionTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapExpressionTest.java
index b214474..7d9d927 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapExpressionTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapExpressionTest.java
@@ -41,7 +41,7 @@ public class WireTapExpressionTest extends ContextTestSupport {
public void configure() {
// START SNIPPET: e1
from("direct:start")
- .wireTap(simple("mock:${header.tap}"));
+ .wireTap("mock:${header.tap}");
// END SNIPPET: e1
}
};
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
index 43232ee..2ecff83 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapNewExchangeTest.java
@@ -52,7 +52,7 @@ public class WireTapNewExchangeTest extends ContextTestSupport {
from("direct:start")
// tap a new message and send it to direct:tap
// the new message should be Bye World with 2 headers
- .wireTap().constant("direct:tap")
+ .wireTap("direct:tap")
// create the new tap message body and headers
.newExchangeBody(constant("Bye World"))
.newExchangeHeader("id", constant(123))
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
index 2d3915f..ce4ae2d 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapOnPrepareRefTest.java
@@ -37,7 +37,7 @@ public class WireTapOnPrepareRefTest extends WireTapOnPrepareTest {
@Override
public void configure() throws Exception {
from("direct:start")
- .wireTap().constant("direct:a").onPrepareRef("deepClone")
+ .wireTap("direct:a").onPrepareRef("deepClone")
.to("direct:b");
from("direct:a").process(new ProcessorA()).to("mock:a");
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/log4j.properties b/components/camel-spring/src/test/resources/log4j.properties
index 7ec94a1..01f6c90 100644
--- a/components/camel-spring/src/test/resources/log4j.properties
+++ b/components/camel-spring/src/test/resources/log4j.properties
@@ -20,10 +20,10 @@
#
log4j.rootLogger=INFO, file
-log4j.logger.org.springframework=WARN
+#log4j.logger.org.springframework=WARN
#log4j.logger.org.apache.camel.impl.converter=WARN
-log4j.logger.org.apache.camel.management=WARN
-log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.management=WARN
+#log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
#log4j.logger.org.apache.camel=DEBUG
#log4j.logger.org.apache.camel.spring=DEBUG
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
index f538243..386ffe1 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/issues/FileWireTapWithXMLPayloadIssueTest.xml
@@ -26,9 +26,7 @@
<route>
<from uri="file://target/xmldata"/>
<convertBodyTo type="java.lang.String"/>
- <wireTap>
- <constant>mock:wiretap</constant>
- </wireTap>
+ <wireTap uri="mock:wiretap"/>
<to uri="mock:result"/>
</route>
</camelContext>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
index c461c57..39677af 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
@@ -25,9 +25,7 @@
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
- <wireTap>
- <constant>direct:tap</constant>
- </wireTap>
+ <wireTap uri="direct:tap"/>
<to uri="mock:end"/>
</route>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
index 686e270..1bf114c 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringComplexBlockWithEndTest.xml
@@ -44,9 +44,7 @@
<to uri="log:otherwise"/>
<to uri="mock:otherwise"/>
<multicast streaming="true">
- <wireTap>
- <constant>mock:trapped</constant>
- </wireTap>
+ <wireTap uri="mock:trapped"/>
<split strategyRef="splitAggregate">
<tokenize token=","/>
<filter>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapExpressionTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapExpressionTest.xml
index 80d7e0b..28f9d24 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapExpressionTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapExpressionTest.xml
@@ -27,9 +27,7 @@
<route>
<from uri="direct:start"/>
- <wireTap>
- <simple>mock:${header.tap}</simple>
- </wireTap>
+ <wireTap uri="mock:${header.tap}"/>
</route>
</camelContext>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
index 4e17834..3b81716 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapNewExchangeTest.xml
@@ -28,8 +28,7 @@
<from uri="direct:start"/>
<!-- tap a new message and send it to direct:tap -->
<!-- the new message should be Bye World with 2 headers -->
- <wireTap>
- <constant>direct:tap</constant>
+ <wireTap uri="direct:tap">
<!-- create the new tap message body and headers -->
<body><constant>Bye World</constant></body>
<setHeader headerName="id"><constant>123</constant></setHeader>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
index 42b1efb..7bd1909 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapTest.xml
@@ -28,9 +28,7 @@
<route>
<from uri="direct:start"/>
<to uri="log:foo"/>
- <wireTap>
- <constant>direct:tap</constant>
- </wireTap>
+ <wireTap uri="direct:tap"/>
<to uri="mock:result"/>
</route>
<!-- END SNIPPET: e1 -->
@@ -45,9 +43,7 @@
<route>
<from uri="direct:test"/>
- <wireTap id="wiretap_1">
- <constant>direct:a</constant>
- </wireTap>
+ <wireTap uri="direct:a" id="wiretap_1"/>
<to uri="mock:a"/>
</route>
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
index 3a01f27..d2d94c8 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetCopyTest.xml
@@ -27,8 +27,7 @@
<!-- START SNIPPET: e1 -->
<route>
<from uri="direct:start"/>
- <wireTap>
- <constant>direct:foo</constant>
+ <wireTap uri="direct:foo">
<body><simple>Bye ${body}</simple></body>
</wireTap>
<to uri="mock:result"/>
@@ -38,9 +37,7 @@
<!-- START SNIPPET: e2 -->
<route>
<from uri="direct:start2"/>
- <wireTap processorRef="myProcessor">
- <constant>direct:foo</constant>
- </wireTap>
+ <wireTap uri="direct:foo" processorRef="myProcessor"/>
<to uri="mock:result"/>
</route>
<!-- END SNIPPET: e2 -->
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
index ad39400..99becd0 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringWireTapUsingFireAndForgetTest.xml
@@ -27,8 +27,7 @@
<!-- START SNIPPET: e1 -->
<route>
<from uri="direct:start"/>
- <wireTap>
- <constant>direct:foo</constant>
+ <wireTap uri="direct:foo">
<body><constant>Bye World</constant></body>
</wireTap>
<to uri="mock:result"/>
@@ -38,9 +37,7 @@
<!-- START SNIPPET: e2 -->
<route>
<from uri="direct:start2"/>
- <wireTap processorRef="myProcessor">
- <constant>direct:foo</constant>
- </wireTap>
+ <wireTap uri="direct:foo" processorRef="myProcessor"/>
<to uri="mock:result"/>
</route>
<!-- END SNIPPET: e2 -->
http://git-wip-us.apache.org/repos/asf/camel/blob/cc556718/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
index c9ae5ad..3ed30d1 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/WireTapOnPrepareTest.xml
@@ -27,9 +27,7 @@
<route>
<from uri="direct:start"/>
<!-- use on prepare with wiretap -->
- <wireTap onPrepareRef="animalDeepClonePrepare">
- <constant>direct:a</constant>
- </wireTap>
+ <wireTap uri="direct:a" onPrepareRef="animalDeepClonePrepare"/>
<to uri="direct:b"/>
</route>