You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/29 17:54:28 UTC
[camel] 01/03: CAMEL-14338: Add RouteIdAware so EIP processors can
know which route they are serving
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2b973ad06420e5637a3c183afbccd42b61e311c6
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 29 13:41:43 2019 +0100
CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving
---
.../java/org/apache/camel/spi/RouteIdAware.java | 38 ++++++++++++++++++++++
.../org/apache/camel/processor/CatchProcessor.java | 14 +++++++-
.../apache/camel/processor/ChoiceProcessor.java | 14 +++++++-
.../camel/processor/ClaimCheckProcessor.java | 14 +++++++-
.../camel/processor/ConvertBodyProcessor.java | 14 +++++++-
.../java/org/apache/camel/processor/Delayer.java | 14 +++++++-
.../java/org/apache/camel/processor/Enricher.java | 14 +++++++-
.../camel/processor/ExchangePatternProcessor.java | 14 +++++++-
.../apache/camel/processor/FilterProcessor.java | 14 +++++++-
.../apache/camel/processor/FinallyProcessor.java | 14 +++++++-
.../org/apache/camel/processor/LogProcessor.java | 14 +++++++-
.../org/apache/camel/processor/LoopProcessor.java | 16 +++++++--
.../camel/processor/MethodCallProcessor.java | 14 +++++++-
.../apache/camel/processor/MulticastProcessor.java | 14 +++++++-
.../camel/processor/OnCompletionProcessor.java | 14 +++++++-
.../java/org/apache/camel/processor/Pipeline.java | 14 +++++++-
.../org/apache/camel/processor/PollEnricher.java | 14 +++++++-
.../org/apache/camel/processor/RecipientList.java | 14 +++++++-
.../camel/processor/RemoveHeaderProcessor.java | 14 +++++++-
.../camel/processor/RemoveHeadersProcessor.java | 14 +++++++-
.../camel/processor/RemovePropertiesProcessor.java | 14 +++++++-
.../camel/processor/RemovePropertyProcessor.java | 14 +++++++-
.../org/apache/camel/processor/Resequencer.java | 16 +++++++--
.../apache/camel/processor/RollbackProcessor.java | 14 +++++++-
.../org/apache/camel/processor/RoutingSlip.java | 14 +++++++-
.../apache/camel/processor/SamplingThrottler.java | 14 +++++++-
.../apache/camel/processor/ScriptProcessor.java | 14 +++++++-
.../camel/processor/SendDynamicProcessor.java | 14 +++++++-
.../org/apache/camel/processor/SendProcessor.java | 13 +++++++-
.../apache/camel/processor/SetBodyProcessor.java | 14 +++++++-
.../apache/camel/processor/SetHeaderProcessor.java | 14 +++++++-
.../camel/processor/SetPropertyProcessor.java | 14 +++++++-
.../org/apache/camel/processor/SortProcessor.java | 15 ++++++++-
.../org/apache/camel/processor/StopProcessor.java | 14 +++++++-
.../apache/camel/processor/StreamResequencer.java | 14 +++++++-
.../apache/camel/processor/ThreadsProcessor.java | 14 +++++++-
.../java/org/apache/camel/processor/Throttler.java | 14 +++++++-
.../camel/processor/ThrowExceptionProcessor.java | 14 +++++++-
.../apache/camel/processor/TransformProcessor.java | 14 +++++++-
.../org/apache/camel/processor/TryProcessor.java | 14 +++++++-
.../apache/camel/processor/WireTapProcessor.java | 14 +++++++-
.../processor/aggregate/AggregateProcessor.java | 12 ++++++-
.../processor/idempotent/IdempotentConsumer.java | 9 +++++
.../loadbalancer/LoadBalancerSupport.java | 14 +++++++-
.../org/apache/camel/reifier/ProcessorReifier.java | 8 +++++
.../camel/management/mbean/ManagedProcessor.java | 3 ++
.../management/ManagedUnregisterProducerTest.java | 4 +--
.../camel/support/processor/CamelLogProcessor.java | 14 +++++++-
.../camel/support/processor/MarshalProcessor.java | 14 +++++++-
.../camel/support/processor/ThroughputLogger.java | 14 +++++++-
.../support/processor/UnmarshalProcessor.java | 14 +++++++-
51 files changed, 657 insertions(+), 51 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java
new file mode 100644
index 0000000..6d8ce7c
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteIdAware.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+/**
+ * To allow objects to be injected with the route id
+ * <p/>
+ * This allows access to the route id of the processor at runtime, to know which route its associated with.
+ */
+public interface RouteIdAware {
+
+ /**
+ * Gets the route id
+ */
+ String getRouteId();
+
+ /**
+ * Sets the route id
+ *
+ * @param routeId the route id
+ */
+ void setRouteId(String routeId);
+
+}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
index 505b37d..ee6f814 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -24,6 +24,7 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -32,9 +33,10 @@ import org.apache.camel.util.ObjectHelper;
/**
* A processor which catches exceptions.
*/
-public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware {
+public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final List<Class<? extends Throwable>> exceptions;
private final Predicate onWhen;
@@ -60,6 +62,16 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable,
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String getTraceLabel() {
return "catch";
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
index eab02fa..96f29ba 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
@@ -27,6 +27,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.service.ServiceHelper;
@@ -38,9 +39,10 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
* they are true their processors are used, with a default otherwise clause used
* if none match.
*/
-public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
+public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final List<FilterProcessor> filters;
private final Processor otherwise;
private transient long notFiltered;
@@ -196,6 +198,16 @@ public class ChoiceProcessor extends AsyncProcessorSupport implements Navigate<P
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
protected void doStart() throws Exception {
ServiceHelper.startService(filters, otherwise);
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
index 587fed6..016d463 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ClaimCheckProcessor.java
@@ -25,6 +25,7 @@ import org.apache.camel.Expression;
import org.apache.camel.impl.engine.DefaultClaimCheckRepository;
import org.apache.camel.spi.ClaimCheckRepository;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LanguageSupport;
@@ -39,10 +40,11 @@ import org.apache.camel.util.ObjectHelper;
* This guards against concurrent and thread-safe issues. For off-memory persistent storage of data, then use
* any of the many Camel components that support persistent storage, and do not use this Claim Check EIP implementation.
*/
-public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware {
+public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware {
private CamelContext camelContext;
private String id;
+ private String routeId;
private String operation;
private AggregationStrategy aggregationStrategy;
private String key;
@@ -69,6 +71,16 @@ public class ClaimCheckProcessor extends AsyncProcessorSupport implements IdAwar
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getOperation() {
return operation;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
index bcdabee..dcb87a8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ConvertBodyProcessor.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
@@ -35,8 +36,9 @@ import org.apache.camel.util.ObjectHelper;
* <p/>
* If the conversion fails an {@link org.apache.camel.InvalidPayloadException} is thrown.
*/
-public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware {
+public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcessor, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Class<?> type;
private final String charset;
@@ -68,6 +70,16 @@ public class ConvertBodyProcessor extends ServiceSupport implements AsyncProcess
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public void process(Exchange exchange) throws Exception {
Message old = exchange.getMessage();
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java b/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java
index 9408cbf..d5b3eb5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Delayer.java
@@ -24,6 +24,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
/**
* A <a href="http://camel.apache.org/delayer.html">Delayer</a> which
@@ -32,7 +33,8 @@ import org.apache.camel.spi.IdAware;
* <p/>
* This implementation will block while waiting.
*/
-public class Delayer extends DelayProcessorSupport implements Traceable, IdAware {
+public class Delayer extends DelayProcessorSupport implements Traceable, IdAware, RouteIdAware {
+ private String routeId;
private String id;
private Expression delay;
private long delayValue;
@@ -59,6 +61,16 @@ public class Delayer extends DelayProcessorSupport implements Traceable, IdAware
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String getTraceLabel() {
return "delay[" + delay + "]";
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
index 3ede7e2..21dab2f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java
@@ -31,6 +31,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
@@ -53,10 +54,11 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern
*
* @see PollEnricher
*/
-public class Enricher extends AsyncProcessorSupport implements IdAware, CamelContextAware {
+public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware {
private CamelContext camelContext;
private String id;
+ private String routeId;
private ProducerCache producerCache;
private final Expression expression;
private AggregationStrategy aggregationStrategy;
@@ -89,6 +91,16 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, CamelCon
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
index 9da6562..516bcc7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
@@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* Processor to set {@link org.apache.camel.ExchangePattern} on the {@link org.apache.camel.Exchange}.
*/
-public class ExchangePatternProcessor extends AsyncProcessorSupport implements IdAware {
+public class ExchangePatternProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware {
private String id;
+ private String routeId;
private ExchangePattern exchangePattern = ExchangePattern.InOnly;
public ExchangePatternProcessor() {
@@ -50,6 +52,16 @@ public class ExchangePatternProcessor extends AsyncProcessorSupport implements I
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public ExchangePattern getExchangePattern() {
return exchangePattern;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java
index 880d2d6..a07576a 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/FilterProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
import org.apache.camel.support.service.ServiceHelper;
@@ -29,9 +30,10 @@ import org.apache.camel.support.service.ServiceHelper;
* The processor which implements the
* <a href="http://camel.apache.org/message-filter.html">Message Filter</a> EIP pattern.
*/
-public class FilterProcessor extends DelegateAsyncProcessor implements Traceable, IdAware {
+public class FilterProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Predicate predicate;
private transient long filtered;
@@ -89,6 +91,16 @@ public class FilterProcessor extends DelegateAsyncProcessor implements Traceable
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String getTraceLabel() {
return "filter[if: " + predicate + "]";
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index bb3f0bc..d43c373 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -21,15 +21,17 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
/**
* Processor to handle do finally supporting asynchronous routing engine
*/
-public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware {
+public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
public FinallyProcessor(Processor processor) {
super(processor);
@@ -73,6 +75,16 @@ public class FinallyProcessor extends DelegateAsyncProcessor implements Traceabl
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
private final class FinallyAsyncCallback implements AsyncCallback {
private final Exchange exchange;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java
index c2c5f49..5d2203e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LogProcessor.java
@@ -26,14 +26,16 @@ import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which evaluates an {@link Expression} and logs it.
*/
-public class LogProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class LogProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
private final CamelLogger logger;
private final MaskingFormatter formatter;
@@ -107,6 +109,16 @@ public class LogProcessor extends AsyncProcessorSupport implements Traceable, Id
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index 6ef3a5b..d368555 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -24,6 +24,7 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -32,9 +33,10 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
/**
* The processor which sends messages in a loop.
*/
-public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware {
+public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
private final Predicate predicate;
private final boolean copy;
@@ -49,7 +51,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
-
LoopState state = new LoopState(exchange, callback);
if (exchange.isTransacted()) {
@@ -58,7 +59,6 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
exchange.getContext().getReactiveExecutor().scheduleMain(state);
}
return false;
-
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
@@ -183,6 +183,16 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String toString() {
if (predicate != null) {
return "Loop[while: " + predicate + " do: " + getProcessor() + "]";
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java
index 0263dc4..6a7e111 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MethodCallProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
@@ -30,8 +31,9 @@ import org.apache.camel.util.ObjectHelper;
/**
* A processor which are used when calling a method and setting the response as the message body
*/
-public class MethodCallProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class MethodCallProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
public MethodCallProcessor(Expression expression) {
@@ -104,6 +106,16 @@ public class MethodCallProcessor extends AsyncProcessorSupport implements Tracea
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 9172b76..b0d49f0 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -51,6 +51,7 @@ import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
@@ -71,7 +72,7 @@ import static org.apache.camel.util.ObjectHelper.notNull;
*
* @see Pipeline
*/
-public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
+public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware {
/**
* Class that represent each step in the multicast route to do
@@ -140,6 +141,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
protected final Processor onPrepare;
private final CamelContext camelContext;
private String id;
+ private String routeId;
private Collection<Processor> processors;
private final AggregationStrategy aggregationStrategy;
private final boolean parallelProcessing;
@@ -206,6 +208,16 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String getTraceLabel() {
return "multicast";
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
index b5a5cc5..686e00c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.SynchronizationAdapter;
@@ -40,10 +41,11 @@ import static org.apache.camel.util.ObjectHelper.notNull;
/**
* Processor implementing <a href="http://camel.apache.org/oncompletion.html">onCompletion</a>.
*/
-public class OnCompletionProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class OnCompletionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private final CamelContext camelContext;
private String id;
+ private String routeId;
private final Processor processor;
private final ExecutorService executorService;
private final boolean shutdownExecutorService;
@@ -101,6 +103,16 @@ public class OnCompletionProcessor extends AsyncProcessorSupport implements Trac
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (processor != null) {
// register callback
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index f4aceac..fcc9baa 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -30,6 +30,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
@@ -41,11 +42,12 @@ import static org.apache.camel.processor.PipelineHelper.continueProcessing;
* Creates a Pipeline pattern where the output of the previous step is sent as
* input to the next step, reusing the same message exchanges
*/
-public class Pipeline extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
+public class Pipeline extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware {
private final CamelContext camelContext;
private List<AsyncProcessor> processors;
private String id;
+ private String routeId;
public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
this.camelContext = camelContext;
@@ -165,6 +167,16 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public List<Processor> next() {
if (!hasNext()) {
return null;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
index 18c9595..f947a0b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -31,6 +31,7 @@ import org.apache.camel.spi.ConsumerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.DefaultConsumer;
@@ -52,11 +53,12 @@ import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern
*
* @see Enricher
*/
-public class PollEnricher extends AsyncProcessorSupport implements IdAware, CamelContextAware {
+public class PollEnricher extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware {
private CamelContext camelContext;
private ConsumerCache consumerCache;
private String id;
+ private String routeId;
private AggregationStrategy aggregationStrategy;
private final Expression expression;
private long timeout;
@@ -95,6 +97,16 @@ public class PollEnricher extends AsyncProcessorSupport implements IdAware, Came
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
index 6523c31..037b064 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -31,6 +31,7 @@ import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ObjectHelper;
@@ -45,11 +46,12 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* pattern where the list of actual endpoints to send a message exchange to are
* dependent on some dynamic expression.
*/
-public class RecipientList extends AsyncProcessorSupport implements IdAware {
+public class RecipientList extends AsyncProcessorSupport implements IdAware, RouteIdAware {
private static final String IGNORE_DELIMITER_MARKER = "false";
private final CamelContext camelContext;
private String id;
+ private String routeId;
private ProducerCache producerCache;
private Expression expression;
private final String delimiter;
@@ -110,6 +112,16 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware {
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isStarted()) {
throw new IllegalStateException("RecipientList has not been started: " + this);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
index 1be2c87..07e93dc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeaderProcessor.java
@@ -20,14 +20,16 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes the header from the IN or OUT message
*/
-public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private final String headerName;
private String id;
+ private String routeId;
public RemoveHeaderProcessor(String headerName) {
this.headerName = headerName;
@@ -65,6 +67,16 @@ public class RemoveHeaderProcessor extends AsyncProcessorSupport implements Trac
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getHeaderName() {
return headerName;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
index 95b993d..27e1249 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemoveHeadersProcessor.java
@@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes one ore more headers from the IN or OUT message
*/
-public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final String pattern;
private final String[] excludePattern;
@@ -67,6 +69,16 @@ public class RemoveHeadersProcessor extends AsyncProcessorSupport implements Tra
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getPattern() {
return pattern;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
index 86818fb..f7b9016 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertiesProcessor.java
@@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes one ore more properties from the exchange
*/
-public class RemovePropertiesProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RemovePropertiesProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final String pattern;
private final String[] excludePattern;
@@ -67,6 +69,16 @@ public class RemovePropertiesProcessor extends AsyncProcessorSupport implements
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getPattern() {
return pattern;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
index 8cdf7a5..b7bf5f9 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RemovePropertyProcessor.java
@@ -20,13 +20,15 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor which removes the property from the exchange
*/
-public class RemovePropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RemovePropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final String propertyName;
public RemovePropertyProcessor(String propertyName) {
@@ -65,6 +67,16 @@ public class RemovePropertyProcessor extends AsyncProcessorSupport implements Tr
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getPropertyName() {
return propertyName;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java
index 0ce608e..f2598c5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -44,6 +44,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExpressionComparator;
@@ -55,12 +56,13 @@ import org.apache.camel.util.ObjectHelper;
* An implementation of the <a href="http://camel.apache.org/resequencer.html">Resequencer</a>
* which can reorder messages within a batch.
*/
-public class Resequencer extends AsyncProcessorSupport implements Navigate<Processor>, IdAware, Traceable {
+public class Resequencer extends AsyncProcessorSupport implements Navigate<Processor>, IdAware, RouteIdAware, Traceable {
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
private String id;
+ private String routeId;
private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
private int batchSize = DEFAULT_BATCH_SIZE;
private int outBatchSize;
@@ -250,7 +252,17 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
this.id = id;
}
- // Implementation methods
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+// Implementation methods
//-------------------------------------------------------------------------
protected static Set<Exchange> createSet(Expression expression, boolean allowDuplicates, boolean reverse) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java
index e91c6a3..7ed8589 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RollbackProcessor.java
@@ -21,14 +21,16 @@ import org.apache.camel.Exchange;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* Processor for marking an {@link org.apache.camel.Exchange} to rollback.
*/
-public class RollbackProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RollbackProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private boolean markRollbackOnly;
private boolean markRollbackOnlyLast;
private String message;
@@ -92,6 +94,16 @@ public class RollbackProcessor extends AsyncProcessorSupport implements Traceabl
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getMessage() {
return message;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
index b79f264..975456d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java
@@ -32,6 +32,7 @@ import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
@@ -52,9 +53,10 @@ import static org.apache.camel.util.ObjectHelper.notNull;
* as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
* pipeline to ensure it works the same and the async routing engine is flawless.
*/
-public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware {
+public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
protected String id;
+ protected String routeId;
protected ProducerCache producerCache;
protected int cacheSize;
protected boolean ignoreInvalidEndpoints;
@@ -112,6 +114,16 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java b/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java
index 10b2492..974cf4b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SamplingThrottler.java
@@ -23,6 +23,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
@@ -36,9 +37,10 @@ import org.apache.camel.support.AsyncProcessorSupport;
* an exchange stream, rough consolidation of noisy and bursty exchange traffic
* or where queuing of throttled exchanges is undesirable.
*/
-public class SamplingThrottler extends AsyncProcessorSupport implements Traceable, IdAware {
+public class SamplingThrottler extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private long messageFrequency;
private long currentMessageCount;
private long samplePeriod;
@@ -88,6 +90,16 @@ public class SamplingThrottler extends AsyncProcessorSupport implements Traceabl
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public String getTraceLabel() {
if (messageFrequency > 0) {
return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]";
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java
index 6183585..4d72dbc 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ScriptProcessor.java
@@ -21,14 +21,16 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which executes the script as an expression and does not change the message body.
*/
-public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class ScriptProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
public ScriptProcessor(Expression expression) {
@@ -68,6 +70,16 @@ public class ScriptProcessor extends AsyncProcessorSupport implements Traceable,
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
index 7cc21f8..12234bd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.SendDynamicAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EndpointHelper;
@@ -42,7 +43,7 @@ import org.apache.camel.util.URISupport;
*
* @see org.apache.camel.processor.SendProcessor
*/
-public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAware, CamelContextAware {
+public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware {
protected SendDynamicAware dynamicAware;
protected CamelContext camelContext;
@@ -51,6 +52,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
protected ExchangePattern pattern;
protected ProducerCache producerCache;
protected String id;
+ protected String routeId;
protected boolean ignoreInvalidEndpoint;
protected int cacheSize;
protected boolean allowOptimisedComponents = true;
@@ -76,6 +78,16 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
if (!isStarted()) {
exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
index cb7abc6..44bc3bd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.Traceable;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ProducerCache;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.EndpointHelper;
import org.apache.camel.support.EventHelper;
@@ -43,7 +44,7 @@ import org.apache.camel.util.URISupport;
*
* @see SendDynamicProcessor
*/
-public class SendProcessor extends AsyncProcessorSupport implements Traceable, EndpointAware, IdAware {
+public class SendProcessor extends AsyncProcessorSupport implements Traceable, EndpointAware, IdAware, RouteIdAware {
protected transient String traceLabelToString;
protected final CamelContext camelContext;
@@ -53,6 +54,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
protected Endpoint destination;
protected ExchangePattern destinationExchangePattern;
protected String id;
+ protected String routeId;
protected volatile long counter;
public SendProcessor(Endpoint destination) {
@@ -88,6 +90,15 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E
this.id = id;
}
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
@Override
public String getTraceLabel() {
if (traceLabelToString == null) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
index ba97651..d39f2ab 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetBodyProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
@@ -29,8 +30,9 @@ import org.apache.camel.support.ExchangeHelper;
/**
* A processor which sets the body on the IN or OUT message with an {@link Expression}
*/
-public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
public SetBodyProcessor(Expression expression) {
@@ -93,6 +95,16 @@ public class SetBodyProcessor extends AsyncProcessorSupport implements Traceable
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
index 124d71d..1fe8188 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetHeaderProcessor.java
@@ -22,14 +22,16 @@ import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the header on the IN or OUT message with an {@link org.apache.camel.Expression}
*/
-public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression headerName;
private final Expression expression;
@@ -84,6 +86,16 @@ public class SetHeaderProcessor extends AsyncProcessorSupport implements Traceab
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getHeaderName() {
return headerName.toString();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
index a1292d4..aaf29d4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SetPropertyProcessor.java
@@ -21,14 +21,16 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the property on the exchange with an {@link org.apache.camel.Expression}
*/
-public class SetPropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class SetPropertyProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression propertyName;
private final Expression expression;
@@ -80,6 +82,16 @@ public class SetPropertyProcessor extends AsyncProcessorSupport implements Trace
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public String getPropertyName() {
return propertyName.toString();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java
index 18d27ef..4bb4373 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SortProcessor.java
@@ -22,15 +22,18 @@ import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* A processor that sorts the expression using a comparator
*/
-public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, org.apache.camel.Traceable {
+public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware, RouteIdAware, Traceable {
private String id;
+ private String routeId;
private final Expression expression;
private final Comparator<? super T> comparator;
@@ -75,6 +78,16 @@ public class SortProcessor<T> extends AsyncProcessorSupport implements IdAware,
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java
index 5532fe5..aefd1ce 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/StopProcessor.java
@@ -19,14 +19,16 @@ package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
* Stops continue processing the route and marks it as complete.
*/
-public class StopProcessor extends AsyncProcessorSupport implements IdAware {
+public class StopProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware {
private String id;
+ private String routeId;
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
@@ -53,6 +55,16 @@ public class StopProcessor extends AsyncProcessorSupport implements IdAware {
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
protected void doStart() throws Exception {
// noop
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java
index c81edf8..93f52ab 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -36,6 +36,7 @@ import org.apache.camel.processor.resequencer.SequenceElementComparator;
import org.apache.camel.processor.resequencer.SequenceSender;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.service.ServiceHelper;
@@ -62,9 +63,10 @@ import org.apache.camel.util.ObjectHelper;
*
* @see ResequencerEngine
*/
-public class StreamResequencer extends AsyncProcessorSupport implements SequenceSender<Exchange>, Navigate<Processor>, Traceable, IdAware {
+public class StreamResequencer extends AsyncProcessorSupport implements SequenceSender<Exchange>, Navigate<Processor>, Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final CamelContext camelContext;
private final ExceptionHandler exceptionHandler;
private final ResequencerEngine<Exchange> engine;
@@ -189,6 +191,16 @@ public class StreamResequencer extends AsyncProcessorSupport implements Sequence
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
protected void doStart() throws Exception {
ServiceHelper.startService(processor);
delivery = new Delivery();
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
index 0e7868b..af737a6 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ThreadsProcessor.java
@@ -25,6 +25,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.Rejectable;
@@ -53,9 +54,10 @@ import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
* will not be free to process a new exchange, as its processing the current exchange.</li>
* </ul>
*/
-public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware {
+public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final CamelContext camelContext;
private final ExecutorService executorService;
private final ThreadPoolRejectedPolicy rejectedPolicy;
@@ -164,6 +166,16 @@ public class ThreadsProcessor extends AsyncProcessorSupport implements IdAware {
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public ThreadPoolRejectedPolicy getRejectedPolicy() {
return rejectedPolicy;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
index 0c68d8e..4ebb520 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Throttler.java
@@ -33,6 +33,7 @@ import org.apache.camel.Expression;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
@@ -56,7 +57,7 @@ import org.apache.camel.util.ObjectHelper;
* callers point of view in the last timePeriodMillis no more than
* maxRequestsPerPeriod have been allowed to be acquired.
*/
-public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware {
+public class Throttler extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
@@ -72,6 +73,7 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
private volatile long timePeriodMillis;
private volatile long cleanPeriodMillis;
private String id;
+ private String routeId;
private Expression maxRequestsPerPeriodExpression;
private boolean rejectExecution;
private boolean asyncDelayed;
@@ -389,6 +391,16 @@ public class Throttler extends AsyncProcessorSupport implements Traceable, IdAwa
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
/**
* Sets the maximum number of requests per time period expression
*/
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
index 0f83a98..68ee7e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/ThrowExceptionProcessor.java
@@ -26,14 +26,16 @@ import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* The processor which sets an {@link Exception} on the {@link Exchange}
*/
-public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, CamelContextAware {
+public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware, CamelContextAware {
private String id;
+ private String routeId;
private CamelContext camelContext;
private Expression simple;
private final Exception exception;
@@ -94,6 +96,16 @@ public class ThrowExceptionProcessor extends AsyncProcessorSupport implements Tr
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Exception getException() {
return exception;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java
index d218d01..19924d0 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TransformProcessor.java
@@ -22,6 +22,7 @@ import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.support.ExchangeHelper;
@@ -30,8 +31,9 @@ import org.apache.camel.util.ObjectHelper;
/**
* A processor which sets the body on the OUT message with an {@link Expression}.
*/
-public class TransformProcessor extends AsyncProcessorSupport implements Traceable, IdAware {
+public class TransformProcessor extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final Expression expression;
public TransformProcessor(Expression expression) {
@@ -100,6 +102,16 @@ public class TransformProcessor extends AsyncProcessorSupport implements Traceab
this.id = id;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
public Expression getExpression() {
return expression;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index d1af436..2d70216 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -27,6 +27,7 @@ import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
@@ -35,9 +36,10 @@ import org.apache.camel.support.service.ServiceHelper;
/**
* Implements try/catch/finally type processing
*/
-public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware {
+public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware {
protected String id;
+ protected String routeId;
protected final Processor tryProcessor;
protected final List<Processor> catchClauses;
protected final Processor finallyProcessor;
@@ -164,4 +166,14 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
public void setId(String id) {
this.id = id;
}
+
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 3c8f190..8ec1f0c 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -36,6 +36,7 @@ import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
@@ -47,9 +48,10 @@ import org.apache.camel.util.ObjectHelper;
/**
* Processor for wire tapping exchanges to an endpoint destination.
*/
-public class WireTapProcessor extends AsyncProcessorSupport implements Traceable, ShutdownAware, IdAware, CamelContextAware {
+public class WireTapProcessor extends AsyncProcessorSupport implements Traceable, ShutdownAware, IdAware, RouteIdAware, CamelContextAware {
private String id;
+ private String routeId;
private CamelContext camelContext;
private final SendDynamicProcessor dynamicProcessor;
private final String uri;
@@ -100,6 +102,16 @@ public class WireTapProcessor extends AsyncProcessorSupport implements Traceable
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index ad0f96d..71028c1 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -55,6 +55,7 @@ import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.spi.Synchronization;
@@ -84,7 +85,7 @@ import org.apache.camel.util.TimeUtils;
* and older prices are discarded). Another idea is to combine line item messages
* together into a single invoice message.
*/
-public class AggregateProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware {
+public class AggregateProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware, RouteIdAware {
public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
public static final String AGGREGATE_OPTIMISTIC_LOCKING_EXECUTOR = "AggregateOptimisticLockingExecutor";
@@ -102,6 +103,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
private final CamelContext camelContext;
private final AsyncProcessor processor;
private String id;
+ private String routeId;
private AggregationStrategy aggregationStrategy;
private boolean preCompletion;
private Expression correlationExpression;
@@ -295,6 +297,14 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
this.id = id;
}
+ public String getRouteId() {
+ return routeId;
+ }
+
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
index 9752419..8a24cb3 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java
@@ -47,6 +47,7 @@ public class IdempotentConsumer extends AsyncProcessorSupport implements CamelCo
private CamelContext camelContext;
private String id;
+ private String routeId;
private final Expression messageIdExpression;
private final AsyncProcessor processor;
private final IdempotentRepository idempotentRepository;
@@ -92,6 +93,14 @@ public class IdempotentConsumer extends AsyncProcessorSupport implements CamelCo
this.id = id;
}
+ public String getRouteId() {
+ return routeId;
+ }
+
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final AsyncCallback target;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
index aae3fbf..d098969 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
@@ -25,16 +25,18 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.service.ServiceHelper;
/**
* A default base class for a {@link LoadBalancer} implementation.
*/
-public abstract class LoadBalancerSupport extends AsyncProcessorSupport implements LoadBalancer, Navigate<Processor>, IdAware {
+public abstract class LoadBalancerSupport extends AsyncProcessorSupport implements LoadBalancer, Navigate<Processor>, IdAware, RouteIdAware {
private final AtomicReference<AsyncProcessor[]> processors = new AtomicReference<>(new AsyncProcessor[0]);
private String id;
+ private String routeId;
@Override
public void addProcessor(AsyncProcessor processor) {
@@ -100,6 +102,16 @@ public abstract class LoadBalancerSupport extends AsyncProcessorSupport implemen
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
protected void doStart() throws Exception {
ServiceHelper.startService((Object[]) processors.get());
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 59e2587..d831188 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -27,6 +27,7 @@ import org.apache.camel.Channel;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
+import org.apache.camel.RouteAware;
import org.apache.camel.model.AggregateDefinition;
import org.apache.camel.model.BeanDefinition;
import org.apache.camel.model.CatchDefinition;
@@ -106,6 +107,7 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -488,6 +490,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
String id = getId(output, routeContext);
((IdAware)processor).setId(id);
}
+ if (processor instanceof RouteIdAware) {
+ ((RouteIdAware)processor).setRouteId(routeContext.getRouteId());
+ }
if (output instanceof Channel && processor == null) {
continue;
@@ -576,6 +581,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends
String id = getId(definition, routeContext);
((IdAware)processor).setId(id);
}
+ if (processor instanceof RouteIdAware) {
+ ((RouteIdAware)processor).setRouteId(routeContext.getRouteId());
+ }
if (processor == null) {
// no processor to make
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java
index 60ad5dd..8c1beb3 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedProcessor.java
@@ -30,6 +30,7 @@ import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.ProcessorDefinitionHelper;
import org.apache.camel.model.StepDefinition;
import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.service.ServiceHelper;
@ManagedResource(description = "Managed Processor")
@@ -133,6 +134,8 @@ public class ManagedProcessor extends ManagedPerformanceCounter implements Manag
public String getRouteId() {
if (route != null) {
return route.getId();
+ } else if (processor instanceof RouteIdAware) {
+ return ((RouteIdAware) processor).getRouteId();
}
return null;
}
diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
index 4f15e20..afa4c6d 100644
--- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
+++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedUnregisterProducerTest.java
@@ -49,8 +49,8 @@ public class ManagedUnregisterProducerTest extends ManagementTestSupport {
assertEquals("mock://result", uri);
// TODO: populating route id on producers is not implemented yet
- // String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
- // assertEquals("route1", routeId);
+ //String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+ //assertEquals("route1", routeId);
Boolean singleton = (Boolean) mbeanServer.getAttribute(on, "Singleton");
assertEquals(Boolean.TRUE, singleton);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java
index b28e728..f614fdb 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/CamelLogProcessor.java
@@ -27,6 +27,7 @@ import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
/**
@@ -36,9 +37,10 @@ import org.apache.camel.support.AsyncProcessorSupport;
* The name <tt>CamelLogger</tt> has been chosen to avoid any name clash with log kits
* which has a <tt>Logger</tt> class.
*/
-public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware {
+public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware, RouteIdAware {
private String id;
+ private String routeId;
private CamelLogger logger;
private ExchangeFormatter formatter;
private MaskingFormatter maskingFormatter;
@@ -76,6 +78,16 @@ public class CamelLogProcessor extends AsyncProcessorSupport implements IdAware
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (logger.shouldLog()) {
String output = formatter.format(exchange);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
index 4f4bfbd..ecbac22 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java
@@ -24,6 +24,7 @@ import org.apache.camel.Message;
import org.apache.camel.Traceable;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.builder.OutputStreamBuilder;
import org.apache.camel.support.service.ServiceHelper;
@@ -33,8 +34,9 @@ import org.apache.camel.util.ObjectHelper;
* Marshals the body of the incoming message using the given
* <a href="http://camel.apache.org/data-format.html">data format</a>
*/
-public class MarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware {
+public class MarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private CamelContext camelContext;
private final DataFormat dataFormat;
@@ -92,6 +94,16 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
index 4e5f4be..fd5ae95 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/ThroughputLogger.java
@@ -27,15 +27,17 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
/**
* A logger for logging message throughput.
*/
-public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware {
+public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProcessor, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private final AtomicInteger receivedCounter = new AtomicInteger();
private NumberFormat numberFormat = NumberFormat.getNumberInstance();
private long groupReceivedCount;
@@ -83,6 +85,16 @@ public class ThroughputLogger extends AsyncProcessorSupport implements AsyncProc
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public void process(Exchange exchange) throws Exception {
if (startTime == 0) {
startTime = System.currentTimeMillis();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
index 21346da..2691450 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java
@@ -28,6 +28,7 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.IOHelper;
@@ -37,8 +38,9 @@ import org.apache.camel.util.ObjectHelper;
* Unmarshals the body of the incoming message using the given
* <a href="http://camel.apache.org/data-format.html">data format</a>
*/
-public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware {
+public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceable, CamelContextAware, IdAware, RouteIdAware {
private String id;
+ private String routeId;
private CamelContext camelContext;
private final DataFormat dataFormat;
@@ -106,6 +108,16 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab
}
@Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
public CamelContext getCamelContext() {
return camelContext;
}