You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ja...@apache.org on 2008/11/05 17:16:57 UTC
svn commit: r711599 - in /activemq/camel/trunk:
camel-core/src/main/java/org/apache/camel/model/
camel-core/src/test/java/org/apache/camel/processor/
components/camel-spring/src/test/java/org/apache/camel/spring/processor/
components/camel-spring/src/t...
Author: janstey
Date: Wed Nov 5 08:16:56 2008
New Revision: 711599
URL: http://svn.apache.org/viewvc?rev=711599&view=rev
Log:
CAMEL-1040 - added configurable thread pool parameter to splitter
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (with props)
activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java (with props)
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml (with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=711599&r1=711598&r2=711599&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Wed Nov 5 08:16:56 2008
@@ -448,11 +448,11 @@
* This splitter responds with the latest message returned from destination
* endpoint.
*
- * @param receipients the expression on which to split
+ * @param recipients the expression on which to split
* @return the builder
*/
- public SplitterType splitter(Expression receipients) {
- SplitterType answer = new SplitterType(receipients);
+ public SplitterType splitter(Expression recipients) {
+ SplitterType answer = new SplitterType(recipients);
addOutput(answer);
return answer;
}
@@ -516,12 +516,12 @@
* This splitter responds with the latest message returned from destination
* endpoint.
*
- * @param receipients the expression on which to split
+ * @param recipients the expression on which to split
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
* @return the builder
*/
- public SplitterType splitter(Expression receipients, boolean parallelProcessing) {
- SplitterType answer = new SplitterType(receipients);
+ public SplitterType splitter(Expression recipients, boolean parallelProcessing) {
+ SplitterType answer = new SplitterType(recipients);
addOutput(answer);
answer.setParallelProcessing(parallelProcessing);
return answer;
@@ -535,6 +535,27 @@
* This splitter responds with the latest message returned from destination
* endpoint.
*
+ * @param recipients the expression on which to split
+ * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+ * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
+ * @return the builder
+ */
+ public SplitterType splitter(Expression recipients, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+ SplitterType answer = new SplitterType(recipients);
+ addOutput(answer);
+ answer.setParallelProcessing(parallelProcessing);
+ answer.setThreadPoolExecutor(threadPoolExecutor);
+ return answer;
+ }
+
+ /**
+ * Creates the <a
+ * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+ * pattern where an expression is evaluated to iterate through each of the
+ * parts of a message and then each part is then send to some endpoint.
+ * This splitter responds with the latest message returned from destination
+ * endpoint.
+ *
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
* @return the expression clause for the expression on which to split
*/
@@ -550,6 +571,26 @@
* href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
* pattern where an expression is evaluated to iterate through each of the
* parts of a message and then each part is then send to some endpoint.
+ * This splitter responds with the latest message returned from destination
+ * endpoint.
+ *
+ * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+ * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
+ * @return the expression clause for the expression on which to split
+ */
+ public ExpressionClause<SplitterType> splitter(boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+ SplitterType answer = new SplitterType();
+ addOutput(answer);
+ answer.setParallelProcessing(parallelProcessing);
+ answer.setThreadPoolExecutor(threadPoolExecutor);
+ return ExpressionClause.createAndSetExpression(answer);
+ }
+
+ /**
+ * Creates the <a
+ * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+ * pattern where an expression is evaluated to iterate through each of the
+ * parts of a message and then each part is then send to some endpoint.
* Answer from the splitter is produced using given {@link AggregationStrategy}
* @param partsExpression the expression on which to split
* @param aggregationStrategy the strategy used to aggregate responses for
@@ -572,6 +613,29 @@
* pattern where an expression is evaluated to iterate through each of the
* parts of a message and then each part is then send to some endpoint.
* Answer from the splitter is produced using given {@link AggregationStrategy}
+ * @param partsExpression the expression on which to split
+ * @param aggregationStrategy the strategy used to aggregate responses for
+ * every part
+ * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+ * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
+ * @return the builder
+ */
+ public SplitterType splitter(Expression partsExpression,
+ AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+ SplitterType answer = new SplitterType(partsExpression);
+ addOutput(answer);
+ answer.setAggregationStrategy(aggregationStrategy);
+ answer.setParallelProcessing(parallelProcessing);
+ answer.setThreadPoolExecutor(threadPoolExecutor);
+ return answer;
+ }
+
+ /**
+ * Creates the <a
+ * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+ * pattern where an expression is evaluated to iterate through each of the
+ * parts of a message and then each part is then send to some endpoint.
+ * Answer from the splitter is produced using given {@link AggregationStrategy}
* @param aggregationStrategy the strategy used to aggregate responses for
* every part
* @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
@@ -585,7 +649,27 @@
return ExpressionClause.createAndSetExpression(answer);
}
-
+ /**
+ * Creates the <a
+ * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
+ * pattern where an expression is evaluated to iterate through each of the
+ * parts of a message and then each part is then send to some endpoint.
+ * Answer from the splitter is produced using given {@link AggregationStrategy}
+ * @param aggregationStrategy the strategy used to aggregate responses for
+ * every part
+ * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
+ * @param threadPoolExecutor override the default {@link ThreadPoolExecutor}
+ * @return the expression clause for the expression on which to split
+ */
+ public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor threadPoolExecutor) {
+ SplitterType answer = new SplitterType();
+ addOutput(answer);
+ answer.setAggregationStrategy(aggregationStrategy);
+ answer.setParallelProcessing(parallelProcessing);
+ answer.setThreadPoolExecutor(threadPoolExecutor);
+ return ExpressionClause.createAndSetExpression(answer);
+ }
+
/**
* Creates the <a
* href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=711599&r1=711598&r2=711599&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java Wed Nov 5 08:16:56 2008
@@ -49,6 +49,8 @@
@XmlTransient
private ThreadPoolExecutor threadPoolExecutor;
@XmlAttribute(required = false)
+ private String threadPoolExecutorRef;
+ @XmlAttribute(required = false)
private Boolean streaming = false;
public SplitterType() {
@@ -78,9 +80,7 @@
if (aggregationStrategy == null) {
aggregationStrategy = new UseLatestAggregationStrategy();
}
- if (threadPoolExecutor == null) {
- threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
- }
+ threadPoolExecutor = createThreadPoolExecutor(routeContext);
return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
isParallelProcessing(), threadPoolExecutor, streaming);
}
@@ -125,6 +125,18 @@
return this;
}
+ private ThreadPoolExecutor createThreadPoolExecutor(RouteContext routeContext) {
+ ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
+ if (threadPoolExecutor == null && threadPoolExecutorRef != null) {
+ threadPoolExecutor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+ }
+ if (threadPoolExecutor == null) {
+ // fall back and use default
+ threadPoolExecutor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
+ }
+ return threadPoolExecutor;
+ }
+
public ThreadPoolExecutor getThreadPoolExecutor() {
return threadPoolExecutor;
}
Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=711599&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Wed Nov 5 08:16:56 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ProcessorType;
+import org.apache.camel.model.RouteType;
+import org.apache.camel.model.SplitterType;
+
+public class SplitterWithCustomThreadPoolExecutorTest extends ContextTestSupport {
+
+ protected ThreadPoolExecutor customThreadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
+
+ public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
+ ThreadPoolExecutor threadPoolExecutor = getSplitter().getThreadPoolExecutor();
+ // this should be sufficient as core pool size is the only thing I changed from the default
+ assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize());
+ assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());
+ }
+
+ protected SplitterType getSplitter() {
+ SplitterType result = null;
+ List<RouteType> routeDefinitions = context.getRouteDefinitions();
+ for (RouteType routeType : routeDefinitions) {
+ result = firstSplitterType(routeType.getOutputs());
+ if (result != null) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ protected SplitterType firstSplitterType(List<ProcessorType<?>> outputs) {
+ SplitterType result = null;
+
+ for (ProcessorType processorType : outputs) {
+ if (processorType instanceof SplitterType) {
+ result = (SplitterType) processorType;
+ } else {
+ result = firstSplitterType(processorType.getOutputs());
+ }
+ if (result != null) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:parallel-custom-pool").splitter(body().tokenize(","), true, customThreadPoolExecutor).to("mock:result");
+ }
+ };
+ }
+}
Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java?rev=711599&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java (added)
+++ activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java Wed Nov 5 08:16:56 2008
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.SplitterWithCustomThreadPoolExecutorTest;
+import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringSplitterWithCustomThreadPoolExecutorTest extends SplitterWithCustomThreadPoolExecutorTest {
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this, "org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml");
+ }
+}
\ No newline at end of file
Propchange: activemq/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringSplitterWithCustomThreadPoolExecutorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml?rev=711599&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml (added)
+++ activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml Wed Nov 5 08:16:56 2008
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
+ ">
+
+ <!-- START SNIPPET: example -->
+ <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
+ <route>
+ <from uri="direct:parallel-custom-pool"/>
+ <splitter threadPoolExecutorRef="threadPoolExecutor">
+ <xpath>/invoice/lineItems</xpath>
+ <to uri="mock:result"/>
+ </splitter>
+ </route>
+ </camelContext>
+
+ <!-- There's an easier way of specifying constructor args, just can't remember it
+ at the moment... old Spring syntax will do for now! -->
+ <bean id="threadPoolExecutor" class="java.util.concurrent.ThreadPoolExecutor">
+ <constructor-arg index="0"><value>8</value></constructor-arg>
+ <constructor-arg index="1"><value>16</value></constructor-arg>
+ <constructor-arg index="2"><value>0</value></constructor-arg>
+ <constructor-arg index="3"><value>MILLISECONDS</value></constructor-arg>
+ <constructor-arg index="4"><bean class="java.util.concurrent.LinkedBlockingQueue"/></constructor-arg>
+ </bean>
+ <!-- END SNIPPET: example -->
+</beans>
Propchange: activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
------------------------------------------------------------------------------
svn:eol-style = native