You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/01/09 11:22:45 UTC
svn commit: r1056903 - in
/camel/trunk/camel-core/src/main/java/org/apache/camel:
processor/MulticastProcessor.java processor/RecipientList.java
util/ServiceHelper.java
Author: davsclaus
Date: Sun Jan 9 10:22:45 2011
New Revision: 1056903
URL: http://svn.apache.org/viewvc?rev=1056903&view=rev
Log:
CAMEL-3497: Fixed subtle issue with aggregation task blocking.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sun Jan 9 10:22:45 2011
@@ -145,7 +145,6 @@ public class MulticastProcessor extends
private final boolean streaming;
private final boolean stopOnException;
private final ExecutorService executorService;
- private ExecutorService aggregationExecutorService;
private final long timeout;
private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
@@ -234,6 +233,8 @@ public class MulticastProcessor extends
protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
final boolean streaming, final AsyncCallback callback) throws Exception {
+ ObjectHelper.notNull(executorService, "ExecutorService", this);
+
final CompletionService<Exchange> completion;
if (streaming) {
// execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
@@ -259,8 +260,8 @@ public class MulticastProcessor extends
AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, original, total, completion, running,
aggregationOnTheFlyDone, allTasksSubmitted, executionException);
- // and start the task using the aggregation execution service
- aggregationExecutorService.submit(task);
+ // and start the aggregation task so we can aggregate on-the-fly
+ executorService.submit(task);
}
LOG.trace("Starting to submit parallel tasks");
@@ -421,11 +422,15 @@ public class MulticastProcessor extends
}
future = completion.poll(left, TimeUnit.MILLISECONDS);
} else {
- // take will wait until the task is complete
if (LOG.isTraceEnabled()) {
LOG.trace("Polling completion task #" + aggregated);
}
- future = completion.take();
+ // we must not block so poll every second
+ future = completion.poll(1, TimeUnit.SECONDS);
+ if (future == null) {
+ // and continue loop which will recheck if we are done
+ continue;
+ }
}
if (future == null && timedOut) {
@@ -866,9 +871,6 @@ public class MulticastProcessor extends
if (timeout > 0 && !isParallelProcessing()) {
throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
}
- if (isParallelProcessing()) {
- aggregationExecutorService = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this, "AggregationTask");
- }
ServiceHelper.startServices(processors);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sun Jan 9 10:22:45 2011
@@ -111,6 +111,15 @@ public class RecipientList extends Servi
isParallelProcessing(), getExecutorService(), isStreaming(), isStopOnException(), getTimeout());
rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
+ // start the service
+ try {
+ ServiceHelper.startService(rlp);
+ } catch (Exception e) {
+ exchange.setException(e);
+ callback.done(true);
+ return true;
+ }
+
// now let the multicast process the exchange
return AsyncProcessorHelper.process(rlp, exchange, callback);
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?rev=1056903&r1=1056902&r2=1056903&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Sun Jan 9 10:22:45 2011
@@ -59,6 +59,9 @@ public final class ServiceHelper {
* Starts all of the given services
*/
public static void startServices(Object... services) throws Exception {
+ if (services == null) {
+ return;
+ }
for (Object value : services) {
startService(value);
}
@@ -68,6 +71,9 @@ public final class ServiceHelper {
* Starts all of the given services
*/
public static void startServices(Collection<?> services) throws Exception {
+ if (services == null) {
+ return;
+ }
for (Object value : services) {
if (value instanceof Service) {
Service service = (Service)value;
@@ -83,6 +89,9 @@ public final class ServiceHelper {
* Stops all of the given services, throwing the first exception caught
*/
public static void stopServices(Object... services) throws Exception {
+ if (services == null) {
+ return;
+ }
List<Object> list = Arrays.asList(services);
stopServices(list);
}
@@ -106,6 +115,9 @@ public final class ServiceHelper {
* Stops all of the given services, throwing the first exception caught
*/
public static void stopServices(Collection<?> services) throws Exception {
+ if (services == null) {
+ return;
+ }
Exception firstException = null;
for (Object value : services) {
if (value instanceof Service) {
@@ -134,6 +146,9 @@ public final class ServiceHelper {
* Stops and shutdowns all of the given services, throwing the first exception caught
*/
public static void stopAndShutdownServices(Object... services) throws Exception {
+ if (services == null) {
+ return;
+ }
List<Object> list = Arrays.asList(services);
stopAndShutdownServices(list);
}
@@ -163,6 +178,9 @@ public final class ServiceHelper {
* Stops and shutdowns all of the given services, throwing the first exception caught
*/
public static void stopAndShutdownServices(Collection<?> services) throws Exception {
+ if (services == null) {
+ return;
+ }
Exception firstException = null;
for (Object value : services) {
@@ -194,6 +212,9 @@ public final class ServiceHelper {
}
public static void resumeServices(Collection<?> services) throws Exception {
+ if (services == null) {
+ return;
+ }
Exception firstException = null;
for (Object value : services) {
if (value instanceof Service) {
@@ -254,6 +275,9 @@ public final class ServiceHelper {
}
public static void suspendServices(Collection<?> services) throws Exception {
+ if (services == null) {
+ return;
+ }
Exception firstException = null;
for (Object value : services) {
if (value instanceof Service) {