You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/30 10:33:50 UTC
svn commit: r959229 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor:
MulticastProcessor.java Splitter.java
Author: davsclaus
Date: Wed Jun 30 08:33:49 2010
New Revision: 959229
URL: http://svn.apache.org/viewvc?rev=959229&view=rev
Log:
Fixed tests
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=959229&r1=959228&r2=959229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Jun 30 08:33:49 2010
@@ -198,7 +198,7 @@ public class MulticastProcessor extends
return true;
}
- protected void doProcessParallel(final Exchange original, final AtomicExchange result, Iterable<ProcessorExchangePair> pairs,
+ protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
boolean streaming, final AsyncCallback callback) throws InterruptedException, ExecutionException {
final CompletionService<Exchange> completion;
final AtomicBoolean running = new AtomicBoolean(true);
@@ -217,7 +217,7 @@ public class MulticastProcessor extends
while (it.hasNext()) {
final ProcessorExchangePair pair = it.next();
final Exchange subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.intValue(), it);
+ updateNewExchange(subExchange, total.intValue(), pairs, it);
completion.submit(new Callable<Exchange>() {
public Exchange call() throws Exception {
@@ -226,7 +226,7 @@ public class MulticastProcessor extends
return subExchange;
}
- doProcess(original, result, it, pair, callback, total);
+ doProcess(original, result, pairs, it, pair, callback, total);
// should we stop in case of an exception occurred during processing?
if (stopOnException && subExchange.getException() != null) {
@@ -263,9 +263,9 @@ public class MulticastProcessor extends
while (it.hasNext()) {
ProcessorExchangePair pair = it.next();
Exchange subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.get(), it);
+ updateNewExchange(subExchange, total.get(), pairs, it);
- boolean sync = doProcess(original, result, it, pair, callback, total);
+ boolean sync = doProcess(original, result, pairs, it, pair, callback, total);
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId() + " is continued being processed asynchronously");
@@ -299,7 +299,8 @@ public class MulticastProcessor extends
return true;
}
- private boolean doProcess(final Exchange original, final AtomicExchange result, final Iterator<ProcessorExchangePair> it,
+ private boolean doProcess(final Exchange original, final AtomicExchange result,
+ final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
boolean sync = true;
@@ -364,8 +365,8 @@ public class MulticastProcessor extends
// prepare and run the next
ProcessorExchangePair pair = it.next();
subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.get(), it);
- boolean sync = doProcess(original, result, it, pair, callback, total);
+ updateNewExchange(subExchange, total.get(), pairs, it);
+ boolean sync = doProcess(original, result, pairs, it, pair, callback, total);
if (!sync) {
if (LOG.isTraceEnabled()) {
@@ -460,9 +461,10 @@ public class MulticastProcessor extends
}
}
- protected void updateNewExchange(Exchange exchange, int index, Iterator<ProcessorExchangePair> allPairs) {
+ protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
+ Iterator<ProcessorExchangePair> it) {
exchange.setProperty(Exchange.MULTICAST_INDEX, index);
- if (allPairs.hasNext()) {
+ if (it.hasNext()) {
exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
} else {
exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=959229&r1=959228&r2=959229&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Wed Jun 30 08:33:49 2010
@@ -158,12 +158,13 @@ public class Splitter extends MulticastP
}
@Override
- protected void updateNewExchange(Exchange exchange, int index, Iterator<ProcessorExchangePair> allPairs) {
+ protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
+ Iterator<ProcessorExchangePair> it) {
exchange.setProperty(Exchange.SPLIT_INDEX, index);
if (allPairs instanceof Collection) {
exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>)allPairs).size());
}
- if (allPairs.hasNext()) {
+ if (it.hasNext()) {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
} else {
exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);