You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/08 10:51:59 UTC
svn commit: r1056678 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor:
MulticastProcessor.java RecipientListProcessor.java Splitter.java
interceptor/DefaultTraceFormatter.java
Author: davsclaus
Date: Sat Jan 8 09:51:59 2011
New Revision: 1056678
URL: http://svn.apache.org/viewvc?rev=1056678&view=rev
Log:
CAMEL-3497: Reduced memory consumption for splitter.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Jan 8 09:51:59 2011
@@ -285,6 +285,8 @@ public class MulticastProcessor extends
total.incrementAndGet();
}
+ // TODO: in streaming mode we need to aggregate on-the-fly
+
// its to hard to do parallel async routing so we let the caller thread be synchronously
// and have it pickup the replies and do the aggregation
boolean timedOut = false;
@@ -664,7 +666,11 @@ public class MulticastProcessor extends
int index = 0;
for (Processor processor : processors) {
- result.add(createProcessorExchangePair(index++, processor, exchange));
+ // copy exchange, and do not share the unit of work
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+ // and add the pair
+ RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
+ result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
}
return result;
@@ -676,34 +682,33 @@ public class MulticastProcessor extends
* You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
* need to be specially prepared before use.
*
- * @param processor the processor
- * @param exchange the exchange
+ * @param index the index
+ * @param processor the processor
+ * @param exchange the exchange
+ * @param routeContext the route context
* @return prepared for use
*/
- protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange) {
+ protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor,
+ Exchange exchange, RouteContext routeContext) {
Processor prepared = processor;
- // copy exchange, and do not share the unit of work
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-
// set property which endpoint we send to
- setToEndpoint(copy, prepared);
+ setToEndpoint(exchange, prepared);
// rework error handling to support fine grained error handling
- prepared = createErrorHandler(exchange, prepared);
+ prepared = createErrorHandler(routeContext, prepared);
- return new DefaultProcessorExchangePair(index, processor, prepared, copy);
+ return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
}
- protected Processor createErrorHandler(Exchange exchange, Processor processor) {
- Processor answer = processor;
+ protected Processor createErrorHandler(RouteContext routeContext, Processor processor) {
+ Processor answer;
- if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
+ if (routeContext != null) {
// wrap the producer in error handler so we have fine grained error handling on
// the output side instead of the input side
// this is needed to support redelivery on that output alone and not doing redelivery
// for the entire multicast block again which will start from scratch again
- RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
// create key for cache
final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
@@ -725,14 +730,18 @@ public class MulticastProcessor extends
// instead of using ProcessorDefinition.wrapInErrorHandler)
try {
processor = builder.createErrorHandler(routeContext, processor);
- // and wrap in unit of work processor so the copy exchange also can run under UoW
- answer = new UnitOfWorkProcessor(processor);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
+ // and wrap in unit of work processor so the copy exchange also can run under UoW
+ answer = new UnitOfWorkProcessor(processor);
+
// add to cache
errorHandlers.putIfAbsent(key, answer);
+ } else {
+ // and wrap in unit of work processor so the copy exchange also can run under UoW
+ answer = new UnitOfWorkProcessor(processor);
}
return answer;
@@ -781,8 +790,8 @@ public class MulticastProcessor extends
/**
* Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
*
- * @param exchange the exchange
- * @param aggregationStrategy the strategy
+ * @param exchange the exchange
+ * @param aggregationStrategy the strategy
*/
protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Sat Jan 8 09:51:59 2011
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
@@ -189,8 +190,10 @@ public class RecipientListProcessor exte
setToEndpoint(copy, prepared);
// rework error handling to support fine grained error handling
- prepared = createErrorHandler(exchange, prepared);
+ RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
+ prepared = createErrorHandler(routeContext, prepared);
+ // and create the pair
return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Jan 8 09:51:59 2011
@@ -34,7 +34,7 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -112,6 +112,7 @@ public class Splitter extends MulticastP
// create a copy which we use as master to copy during splitting
// this avoids any side effect reflected upon the incoming exchange
private final Exchange copy = ExchangeHelper.createCopy(exchange, true);
+ private final RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
public Iterator iterator() {
return new Iterator() {
@@ -140,15 +141,16 @@ public class Splitter extends MulticastP
public Object next() {
Object part = iterator.next();
- // create a copy as the new exchange to be routed in the splitter from the copy
- Exchange newExchange = ExchangeHelper.createCopy(copy, true);
+ // create a correlated copy as the new exchange to be routed in the splitter from the copy
+ // and do not share the unit of work
+ Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
if (part instanceof Message) {
- newExchange.setIn((Message)part);
+ newExchange.setIn((Message) part);
} else {
Message in = newExchange.getIn();
in.setBody(part);
}
- return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange);
+ return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
}
public void remove() {
@@ -180,7 +182,7 @@ public class Splitter extends MulticastP
exchange.setProperty(Exchange.SPLIT_INDEX, index);
if (allPairs instanceof Collection) {
- exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)allPairs).size());
+ exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
}
if (it.hasNext()) {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java Sat Jan 8 09:51:59 2011
@@ -294,25 +294,27 @@ public class DefaultTraceFormatter imple
String to = "";
String route = "";
if (showNode || showRouteId) {
- TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
+ if (exchange.getUnitOfWork() != null) {
+ TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
- RouteNode traceFrom = traced.getSecondLastNode();
- if (traceFrom != null) {
- from = getNodeMessage(traceFrom, exchange);
- } else if (exchange.getFromEndpoint() != null) {
- from = "from(" + exchange.getFromEndpoint().getEndpointUri() + ")";
- }
-
- RouteNode traceTo = traced.getLastNode();
- if (traceTo != null) {
- to = getNodeMessage(traceTo, exchange);
- // if its an abstract dummy holder then we have to get the 2nd last so we can get the real node that has
- // information which route it belongs to
- if (traceTo.isAbstract() && traceTo.getProcessorDefinition() == null) {
- traceTo = traced.getSecondLastNode();
+ RouteNode traceFrom = traced.getSecondLastNode();
+ if (traceFrom != null) {
+ from = getNodeMessage(traceFrom, exchange);
+ } else if (exchange.getFromEndpoint() != null) {
+ from = "from(" + exchange.getFromEndpoint().getEndpointUri() + ")";
}
+
+ RouteNode traceTo = traced.getLastNode();
if (traceTo != null) {
- route = extractRoute(traceTo.getProcessorDefinition());
+ to = getNodeMessage(traceTo, exchange);
+ // if its an abstract dummy holder then we have to get the 2nd last so we can get the real node that has
+ // information which route it belongs to
+ if (traceTo.isAbstract() && traceTo.getProcessorDefinition() == null) {
+ traceTo = traced.getSecondLastNode();
+ }
+ if (traceTo != null) {
+ route = extractRoute(traceTo.getProcessorDefinition());
+ }
}
}
}