You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 10:33:50 UTC

svn commit: r959229 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/processor: MulticastProcessor.java Splitter.java

Author: davsclaus
Date: Wed Jun 30 08:33:49 2010
New Revision: 959229

URL: http://svn.apache.org/viewvc?rev=959229&view=rev
Log:
Fixed tests

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=959229&r1=959228&r2=959229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jun 30 08:33:49 2010
@@ -198,7 +198,7 @@ public class MulticastProcessor extends 
         return true;
     }
 
-    protected void doProcessParallel(final Exchange original, final AtomicExchange result, Iterable<ProcessorExchangePair> pairs,
+    protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
                                      boolean streaming, final AsyncCallback callback) throws InterruptedException, ExecutionException {
         final CompletionService<Exchange> completion;
         final AtomicBoolean running = new AtomicBoolean(true);
@@ -217,7 +217,7 @@ public class MulticastProcessor extends 
         while (it.hasNext()) {
             final ProcessorExchangePair pair = it.next();
             final Exchange subExchange = pair.getExchange();
-            updateNewExchange(subExchange, total.intValue(), it);
+            updateNewExchange(subExchange, total.intValue(), pairs, it);
 
             completion.submit(new Callable<Exchange>() {
                 public Exchange call() throws Exception {
@@ -226,7 +226,7 @@ public class MulticastProcessor extends 
                         return subExchange;
                     }
 
-                    doProcess(original, result, it, pair, callback, total);
+                    doProcess(original, result, pairs, it, pair, callback, total);
 
                     // should we stop in case of an exception occurred during processing?
                     if (stopOnException && subExchange.getException() != null) {
@@ -263,9 +263,9 @@ public class MulticastProcessor extends 
         while (it.hasNext()) {
             ProcessorExchangePair pair = it.next();
             Exchange subExchange = pair.getExchange();
-            updateNewExchange(subExchange, total.get(), it);
+            updateNewExchange(subExchange, total.get(), pairs, it);
 
-            boolean sync = doProcess(original, result, it, pair, callback, total);
+            boolean sync = doProcess(original, result, pairs, it, pair, callback, total);
             if (!sync) {
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId() + " is continued being processed asynchronously");
@@ -299,7 +299,8 @@ public class MulticastProcessor extends 
         return true;
     }
 
-    private boolean doProcess(final Exchange original, final AtomicExchange result, final Iterator<ProcessorExchangePair> it,
+    private boolean doProcess(final Exchange original, final AtomicExchange result,
+                              final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
                               final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
         boolean sync = true;
 
@@ -364,8 +365,8 @@ public class MulticastProcessor extends 
                         // prepare and run the next
                         ProcessorExchangePair pair = it.next();
                         subExchange = pair.getExchange();
-                        updateNewExchange(subExchange, total.get(), it);
-                        boolean sync = doProcess(original, result, it, pair, callback, total);
+                        updateNewExchange(subExchange, total.get(), pairs, it);
+                        boolean sync = doProcess(original, result, pairs, it, pair, callback, total);
 
                         if (!sync) {
                             if (LOG.isTraceEnabled()) {
@@ -460,9 +461,10 @@ public class MulticastProcessor extends 
         }
     }
 
-    protected void updateNewExchange(Exchange exchange, int index, Iterator<ProcessorExchangePair> allPairs) {
+    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
+                                     Iterator<ProcessorExchangePair> it) {
         exchange.setProperty(Exchange.MULTICAST_INDEX, index);
-        if (allPairs.hasNext()) {
+        if (it.hasNext()) {
             exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
         } else {
             exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=959229&r1=959228&r2=959229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Wed Jun 30 08:33:49 2010
@@ -158,12 +158,13 @@ public class Splitter extends MulticastP
     }
 
     @Override
-    protected void updateNewExchange(Exchange exchange, int index, Iterator<ProcessorExchangePair> allPairs) {
+    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
+                                     Iterator<ProcessorExchangePair> it) {
         exchange.setProperty(Exchange.SPLIT_INDEX, index);
         if (allPairs instanceof Collection) {
             exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)allPairs).size());
         }
-        if (allPairs.hasNext()) {
+        if (it.hasNext()) {
             exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
         } else {
             exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);