You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/24 14:58:00 UTC
svn commit: r1353260 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/
Author: davsclaus
Date: Sun Jun 24 12:58:00 2012
New Revision: 1353260
URL: http://svn.apache.org/viewvc?rev=1353260&view=rev
Log:
CAMEL-5385: Allow to use custom/shared thread pool for scheduled polling consumers.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1353260&r1=1353259&r2=1353260&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sun Jun 24 12:58:00 2012
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService scheduledExecutorService;
private boolean shutdownExecutor;
private ScheduledFuture<?> future;
@@ -60,12 +60,12 @@ public abstract class ScheduledPollConsu
super(endpoint, processor);
}
- public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
+ public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
super(endpoint, processor);
// we have been given an existing thread pool, so we should not manage its lifecycle
// so we should keep shutdownExecutor as false
- this.executor = executor;
- ObjectHelper.notNull(executor, "executor");
+ this.scheduledExecutorService = scheduledExecutorService;
+ ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
}
/**
@@ -283,6 +283,23 @@ public abstract class ScheduledPollConsu
return sendEmptyMessageWhenIdle;
}
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduledExecutorService;
+ }
+
+ /**
+ * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool
+ * <p/>
+ * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread
+ * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool
+ * when the consumer is started/stopped etc.)
+ *
+ * @param scheduledExecutorService the custom thread pool to use
+ */
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.scheduledExecutorService = scheduledExecutorService;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
@@ -299,15 +316,15 @@ public abstract class ScheduledPollConsu
super.doStart();
// if no existing executor provided, then create a new thread pool ourselves
- if (executor == null) {
+ if (scheduledExecutorService == null) {
// we only need one thread in the pool to schedule this task
- this.executor = getEndpoint().getCamelContext().getExecutorServiceManager()
+ this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager()
.newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1);
// and we should shutdown the thread pool when no longer needed
this.shutdownExecutor = true;
}
- ObjectHelper.notNull(executor, "executor", this);
+ ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this);
ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
if (isStartScheduler()) {
@@ -321,13 +338,13 @@ public abstract class ScheduledPollConsu
LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
}
- future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
+ future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}",
new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
}
- future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
+ future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
}
}
@@ -342,9 +359,9 @@ public abstract class ScheduledPollConsu
@Override
protected void doShutdown() throws Exception {
- if (shutdownExecutor && executor != null) {
- getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
- executor = null;
+ if (shutdownExecutor && scheduledExecutorService != null) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+ scheduledExecutorService = null;
future = null;
}
super.doShutdown();
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1353260&r1=1353259&r2=1353260&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Sun Jun 24 12:58:00 2012
@@ -62,13 +62,14 @@ public abstract class ScheduledPollEndpo
Object pollStrategy = options.remove("pollStrategy");
Object runLoggingLevel = options.remove("runLoggingLevel");
Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle");
+ Object scheduledExecutorService = options.remove("scheduledExecutorService");
boolean setConsumerProperties = false;
// the following is split into two if statements to satisfy the checkstyle max complexity constraint
if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
setConsumerProperties = true;
}
- if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null) {
+ if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) {
setConsumerProperties = true;
}
@@ -101,6 +102,9 @@ public abstract class ScheduledPollEndpo
if (sendEmptyMessageWhenIdle != null) {
consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle);
}
+ if (scheduledExecutorService != null) {
+ consumerProperties.put("scheduledExecutorService", scheduledExecutorService);
+ }
}
}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java?rev=1353260&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java Sun Jun 24 12:58:00 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class FileConsumerSharedThreadPollStopRouteTest extends FileConsumerSharedThreadPollTest {
+
+ public void testSharedThreadPool() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(2);
+ // thread thread name should be the same
+ mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));
+
+ template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ assertMockEndpointsSatisfied();
+
+ // now stop a
+ context.stopRoute("a");
+
+ resetMocks();
+ mock.expectedBodiesReceived("Bye World 2");
+ // a should not be polled
+ mock.expectedFileExists("target/a/hello2.txt");
+
+ template.sendBodyAndHeader("file:target/a", "Hello World 2", Exchange.FILE_NAME, "hello2.txt");
+ template.sendBodyAndHeader("file:target/b", "Bye World 2", Exchange.FILE_NAME, "bye2.txt");
+
+ assertMockEndpointsSatisfied();
+
+ // now start a, which should pickup the file
+ resetMocks();
+ mock.expectedBodiesReceived("Hello World 2");
+ context.startRoute("a");
+
+ assertMockEndpointsSatisfied();
+ }
+
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java?rev=1353260&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java Sun Jun 24 12:58:00 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+/**
+ *
+ */
+public class FileConsumerSharedThreadPollTest extends ContextTestSupport {
+
+ private ScheduledExecutorService pool;
+ private SimpleRegistry registry = new SimpleRegistry();
+
+ @Override
+ protected void setUp() throws Exception {
+ deleteDirectory("target/a");
+ deleteDirectory("target/b");
+ super.setUp();
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ return new DefaultCamelContext(registry);
+ }
+
+ public void testSharedThreadPool() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(2);
+ // thread thread name should be the same
+ mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));
+
+ template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
+ template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ // create shared pool and enlist in registry
+ pool = new ThreadPoolBuilder(context).poolSize(1).buildScheduled(this, "MySharedPool");
+ registry.put("myPool", pool);
+
+ from("file:target/a?scheduledExecutorService=#myPool").routeId("a")
+ .to("direct:shared");
+
+ from("file:target/b?scheduledExecutorService=#myPool").routeId("b")
+ .to("direct:shared");
+
+ from("direct:shared").routeId("shared")
+ .convertBodyTo(String.class)
+ .log("Get ${file:name} using ${threadName}")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader("threadName", Thread.currentThread().getName());
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+}