You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/02/19 09:34:32 UTC
svn commit: r1072268 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/MulticastProcessor.java
test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java
test/java/org/apache/camel/processor/SplitterNullBodyTest.java
Author: davsclaus
Date: Sat Feb 19 08:34:32 2011
New Revision: 1072268
URL: http://svn.apache.org/viewvc?rev=1072268&view=rev
Log:
CAMEL-3685: Fixed splitter in parallel mode blocking if nothing to iterate during splitting.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1072268&r1=1072267&r2=1072268&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Feb 19 08:34:32 2011
@@ -245,16 +245,16 @@ public class MulticastProcessor extends
completion = new SubmitOrderedCompletionService<Exchange>(executorService);
}
- // when parallel then aggregate on the fly
- final AtomicBoolean running = new AtomicBoolean(true);
final AtomicInteger total = new AtomicInteger(0);
- final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
- final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
- final AtomicException executionException = new AtomicException();
-
final Iterator<ProcessorExchangePair> it = pairs.iterator();
if (it.hasNext()) {
+ // when parallel then aggregate on the fly
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
+ final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
+ final AtomicException executionException = new AtomicException();
+
// issue task to execute in separate thread so it can aggregate on-the-fly
// while we submit new tasks, and those tasks complete concurrently
// this allows us to optimize work and reduce memory consumption
@@ -263,71 +263,71 @@ public class MulticastProcessor extends
// and start the aggregation task so we can aggregate on-the-fly
aggregateExecutorService.submit(task);
- }
- LOG.trace("Starting to submit parallel tasks");
+ LOG.trace("Starting to submit parallel tasks");
- while (it.hasNext()) {
- final ProcessorExchangePair pair = it.next();
- final Exchange subExchange = pair.getExchange();
- updateNewExchange(subExchange, total.intValue(), pairs, it);
-
- completion.submit(new Callable<Exchange>() {
- public Exchange call() throws Exception {
- if (!running.get()) {
- // do not start processing the task if we are not running
- return subExchange;
- }
+ while (it.hasNext()) {
+ final ProcessorExchangePair pair = it.next();
+ final Exchange subExchange = pair.getExchange();
+ updateNewExchange(subExchange, total.intValue(), pairs, it);
+
+ completion.submit(new Callable<Exchange>() {
+ public Exchange call() throws Exception {
+ if (!running.get()) {
+ // do not start processing the task if we are not running
+ return subExchange;
+ }
- try {
- doProcessParallel(pair);
- } catch (Throwable e) {
- subExchange.setException(e);
- }
+ try {
+ doProcessParallel(pair);
+ } catch (Throwable e) {
+ subExchange.setException(e);
+ }
- // Decide whether to continue with the multicast or not; similar logic to the Pipeline
- Integer number = getExchangeIndex(subExchange);
- boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
- if (stopOnException && !continueProcessing) {
- // signal to stop running
- running.set(false);
- // throw caused exception
- if (subExchange.getException() != null) {
- // wrap in exception to explain where it failed
- throw new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
+ // Decide whether to continue with the multicast or not; similar logic to the Pipeline
+ Integer number = getExchangeIndex(subExchange);
+ boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
+ if (stopOnException && !continueProcessing) {
+ // signal to stop running
+ running.set(false);
+ // throw caused exception
+ if (subExchange.getException() != null) {
+ // wrap in exception to explain where it failed
+ throw new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
+ }
}
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Parallel processing complete for exchange: " + subExchange);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Parallel processing complete for exchange: " + subExchange);
+ }
+ return subExchange;
}
- return subExchange;
- }
- });
+ });
- total.incrementAndGet();
- }
+ total.incrementAndGet();
+ }
- // signal all tasks has been submitted
- if (LOG.isTraceEnabled()) {
- LOG.trace("Signaling that all " + total.get() + " tasks has been submitted.");
- }
- allTasksSubmitted.set(true);
-
- // its to hard to do parallel async routing so we let the caller thread be synchronously
- // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
- // wait for aggregation to be done
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting for on-the-fly aggregation to complete aggregating " + total.get() + " responses.");
- }
- aggregationOnTheFlyDone.await();
+ // signal all tasks has been submitted
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Signaling that all " + total.get() + " tasks has been submitted.");
+ }
+ allTasksSubmitted.set(true);
- // did we fail for whatever reason, if so throw that caused exception
- if (executionException.get() != null) {
+ // its to hard to do parallel async routing so we let the caller thread be synchronously
+ // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
+ // wait for aggregation to be done
if (LOG.isDebugEnabled()) {
- LOG.debug("Parallel processing failed due " + executionException.get().getMessage());
+ LOG.debug("Waiting for on-the-fly aggregation to complete aggregating " + total.get() + " responses.");
+ }
+ aggregationOnTheFlyDone.await();
+
+ // did we fail for whatever reason, if so throw that caused exception
+ if (executionException.get() != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parallel processing failed due " + executionException.get().getMessage());
+ }
+ throw executionException.get();
}
- throw executionException.get();
}
// no everything is okay so we are done
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java?rev=1072268&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyParallelTest.java Sat Feb 19 08:34:32 2011
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Testing the splitter can work with null or empty bodies
+ */
+public class SplitterNullBodyParallelTest extends SplitterNullBodyTest {
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(body()).parallelProcessing()
+ .to("mock:split")
+ .end()
+ .to("mock:result");
+
+ from("direct:streaming")
+ .split(body()).streaming().parallelProcessing()
+ .to("mock:split")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyTest.java?rev=1072268&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterNullBodyTest.java Sat Feb 19 08:34:32 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * Testing the splitter can work with null or empty bodies
+ */
+public class SplitterNullBodyTest extends ContextTestSupport {
+
+ public void testSplitABC() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "A,B,C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSplitABCStreaming() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(3);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:streaming", "A,B,C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSplitEmptyList() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ List list = new ArrayList();
+ template.sendBody("direct:start", list);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSplitEmptyListStreaming() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ List list = new ArrayList();
+ template.sendBody("direct:streaming", list);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSplitNullBody() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", null);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testSplitNullBodyStreaming() throws Exception {
+ getMockEndpoint("mock:split").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:streaming", null);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(body())
+ .to("mock:split")
+ .end()
+ .to("mock:result");
+
+ from("direct:streaming")
+ .split(body()).streaming()
+ .to("mock:split")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}