You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/08 10:51:59 UTC

svn commit: r1056678 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/processor: MulticastProcessor.java RecipientListProcessor.java Splitter.java interceptor/DefaultTraceFormatter.java

Author: davsclaus
Date: Sat Jan  8 09:51:59 2011
New Revision: 1056678

URL: http://svn.apache.org/viewvc?rev=1056678&view=rev
Log:
CAMEL-3497: Reduced memory consumption for splitter.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Jan  8 09:51:59 2011
@@ -285,6 +285,8 @@ public class MulticastProcessor extends 
             total.incrementAndGet();
         }
 
+        // TODO: in streaming mode we need to aggregate on-the-fly
+
         // its to hard to do parallel async routing so we let the caller thread be synchronously
         // and have it pickup the replies and do the aggregation
         boolean timedOut = false;
@@ -664,7 +666,11 @@ public class MulticastProcessor extends 
 
         int index = 0;
         for (Processor processor : processors) {
-            result.add(createProcessorExchangePair(index++, processor, exchange));
+            // copy exchange, and do not share the unit of work
+            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
+            // and add the pair
+            RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
+            result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
         }
 
         return result;
@@ -676,34 +682,33 @@ public class MulticastProcessor extends 
      * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
      * need to be specially prepared before use.
      *
-     * @param processor the processor
-     * @param exchange  the exchange
+     * @param index        the index
+     * @param processor    the processor
+     * @param exchange     the exchange
+     * @param routeContext the route context
      * @return prepared for use
      */
-    protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange) {
+    protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor,
+                                                                Exchange exchange, RouteContext routeContext) {
         Processor prepared = processor;
 
-        // copy exchange, and do not share the unit of work
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
-
         // set property which endpoint we send to
-        setToEndpoint(copy, prepared);
+        setToEndpoint(exchange, prepared);
 
         // rework error handling to support fine grained error handling
-        prepared = createErrorHandler(exchange, prepared);
+        prepared = createErrorHandler(routeContext, prepared);
 
-        return new DefaultProcessorExchangePair(index, processor, prepared, copy);
+        return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
     }
 
-    protected Processor createErrorHandler(Exchange exchange, Processor processor) {
-        Processor answer = processor;
+    protected Processor createErrorHandler(RouteContext routeContext, Processor processor) {
+        Processor answer;
 
-        if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext() != null) {
+        if (routeContext != null) {
             // wrap the producer in error handler so we have fine grained error handling on
             // the output side instead of the input side
             // this is needed to support redelivery on that output alone and not doing redelivery
             // for the entire multicast block again which will start from scratch again
-            RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
 
             // create key for cache
             final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
@@ -725,14 +730,18 @@ public class MulticastProcessor extends 
             // instead of using ProcessorDefinition.wrapInErrorHandler)
             try {
                 processor = builder.createErrorHandler(routeContext, processor);
-                // and wrap in unit of work processor so the copy exchange also can run under UoW
-                answer = new UnitOfWorkProcessor(processor);
             } catch (Exception e) {
                 throw ObjectHelper.wrapRuntimeCamelException(e);
             }
 
+            // and wrap in unit of work processor so the copy exchange also can run under UoW
+            answer = new UnitOfWorkProcessor(processor);
+
             // add to cache
             errorHandlers.putIfAbsent(key, answer);
+        } else {
+            // and wrap in unit of work processor so the copy exchange also can run under UoW
+            answer = new UnitOfWorkProcessor(processor);
         }
 
         return answer;
@@ -781,8 +790,8 @@ public class MulticastProcessor extends 
     /**
      * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
      *
-     * @param exchange  the exchange
-     * @param aggregationStrategy  the strategy
+     * @param exchange            the exchange
+     * @param aggregationStrategy the strategy
      */
     protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
         Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java Sat Jan  8 09:51:59 2011
@@ -28,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -189,8 +190,10 @@ public class RecipientListProcessor exte
         setToEndpoint(copy, prepared);
 
         // rework error handling to support fine grained error handling
-        prepared = createErrorHandler(exchange, prepared);
+        RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
+        prepared = createErrorHandler(routeContext, prepared);
 
+        // and create the pair
         return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Jan  8 09:51:59 2011
@@ -34,7 +34,7 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
-import org.apache.camel.util.CollectionHelper;
+import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -112,6 +112,7 @@ public class Splitter extends MulticastP
             // create a copy which we use as master to copy during splitting
             // this avoids any side effect reflected upon the incoming exchange
             private final Exchange copy = ExchangeHelper.createCopy(exchange, true);
+            private final RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
 
             public Iterator iterator() {
                 return new Iterator() {
@@ -140,15 +141,16 @@ public class Splitter extends MulticastP
 
                     public Object next() {
                         Object part = iterator.next();
-                        // create a copy as the new exchange to be routed in the splitter from the copy
-                        Exchange newExchange = ExchangeHelper.createCopy(copy, true);
+                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
+                        // and do not share the unit of work
+                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
                         if (part instanceof Message) {
-                            newExchange.setIn((Message)part);
+                            newExchange.setIn((Message) part);
                         } else {
                             Message in = newExchange.getIn();
                             in.setBody(part);
                         }
-                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange);
+                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
                     }
 
                     public void remove() {
@@ -180,7 +182,7 @@ public class Splitter extends MulticastP
 
         exchange.setProperty(Exchange.SPLIT_INDEX, index);
         if (allPairs instanceof Collection) {
-            exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)allPairs).size());
+            exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
         }
         if (it.hasNext()) {
             exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java?rev=1056678&r1=1056677&r2=1056678&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultTraceFormatter.java Sat Jan  8 09:51:59 2011
@@ -294,25 +294,27 @@ public class DefaultTraceFormatter imple
         String to = "";
         String route = "";
         if (showNode || showRouteId) {
-            TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
+            if (exchange.getUnitOfWork() != null) {
+                TracedRouteNodes traced = exchange.getUnitOfWork().getTracedRouteNodes();
 
-            RouteNode traceFrom = traced.getSecondLastNode();
-            if (traceFrom != null) {
-                from = getNodeMessage(traceFrom, exchange);
-            } else if (exchange.getFromEndpoint() != null) {
-                from = "from(" + exchange.getFromEndpoint().getEndpointUri() + ")";
-            }
-
-            RouteNode traceTo = traced.getLastNode();
-            if (traceTo != null) {
-                to = getNodeMessage(traceTo, exchange);
-                // if its an abstract dummy holder then we have to get the 2nd last so we can get the real node that has
-                // information which route it belongs to
-                if (traceTo.isAbstract() && traceTo.getProcessorDefinition() == null) {
-                    traceTo = traced.getSecondLastNode();
+                RouteNode traceFrom = traced.getSecondLastNode();
+                if (traceFrom != null) {
+                    from = getNodeMessage(traceFrom, exchange);
+                } else if (exchange.getFromEndpoint() != null) {
+                    from = "from(" + exchange.getFromEndpoint().getEndpointUri() + ")";
                 }
+
+                RouteNode traceTo = traced.getLastNode();
                 if (traceTo != null) {
-                    route = extractRoute(traceTo.getProcessorDefinition());
+                    to = getNodeMessage(traceTo, exchange);
+                    // if its an abstract dummy holder then we have to get the 2nd last so we can get the real node that has
+                    // information which route it belongs to
+                    if (traceTo.isAbstract() && traceTo.getProcessorDefinition() == null) {
+                        traceTo = traced.getSecondLastNode();
+                    }
+                    if (traceTo != null) {
+                        route = extractRoute(traceTo.getProcessorDefinition());
+                    }
                 }
             }
         }