You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2012/06/24 14:58:00 UTC

svn commit: r1353260 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ test/java/org/apache/camel/component/file/

Author: davsclaus
Date: Sun Jun 24 12:58:00 2012
New Revision: 1353260

URL: http://svn.apache.org/viewvc?rev=1353260&view=rev
Log:
CAMEL-5385: Allow to use custom/shared thread pool for scheduled polling consumers.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=1353260&r1=1353259&r2=1353260&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Sun Jun 24 12:58:00 2012
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable, SuspendableService, PollingConsumerPollingStrategy {
     private static final transient Logger LOG = LoggerFactory.getLogger(ScheduledPollConsumer.class);
 
-    private ScheduledExecutorService executor;
+    private ScheduledExecutorService scheduledExecutorService;
     private boolean shutdownExecutor;
     private ScheduledFuture<?> future;
 
@@ -60,12 +60,12 @@ public abstract class ScheduledPollConsu
         super(endpoint, processor);
     }
 
-    public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
+    public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService scheduledExecutorService) {
         super(endpoint, processor);
         // we have been given an existing thread pool, so we should not manage its lifecycle
         // so we should keep shutdownExecutor as false
-        this.executor = executor;
-        ObjectHelper.notNull(executor, "executor");
+        this.scheduledExecutorService = scheduledExecutorService;
+        ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService");
     }
 
     /**
@@ -283,6 +283,23 @@ public abstract class ScheduledPollConsu
         return sendEmptyMessageWhenIdle;
     }
 
+    public ScheduledExecutorService getScheduledExecutorService() {
+        return scheduledExecutorService;
+    }
+
+    /**
+     * Sets a custom shared {@link ScheduledExecutorService} to use as thread pool
+     * <p/>
+     * <b>Notice: </b> When using a custom thread pool, then the lifecycle of this thread
+     * pool is not controlled by this consumer (eg this consumer will not start/stop the thread pool
+     * when the consumer is started/stopped etc.)
+     *
+     * @param scheduledExecutorService the custom thread pool to use
+     */
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.scheduledExecutorService = scheduledExecutorService;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -299,15 +316,15 @@ public abstract class ScheduledPollConsu
         super.doStart();
 
         // if no existing executor provided, then create a new thread pool ourselves
-        if (executor == null) {
+        if (scheduledExecutorService == null) {
             // we only need one thread in the pool to schedule this task
-            this.executor = getEndpoint().getCamelContext().getExecutorServiceManager()
+            this.scheduledExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager()
                     .newScheduledThreadPool(this, getEndpoint().getEndpointUri(), 1);
             // and we should shutdown the thread pool when no longer needed
             this.shutdownExecutor = true;
         }
 
-        ObjectHelper.notNull(executor, "executor", this);
+        ObjectHelper.notNull(scheduledExecutorService, "scheduledExecutorService", this);
         ObjectHelper.notNull(pollStrategy, "pollStrategy", this);
 
         if (isStartScheduler()) {
@@ -321,13 +338,13 @@ public abstract class ScheduledPollConsu
                 LOG.debug("Scheduling poll (fixed delay) with initialDelay: {}, delay: {} ({}) for: {}",
                         new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
             }
-            future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
+            future = scheduledExecutorService.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
         } else {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Scheduling poll (fixed rate) with initialDelay: {}, delay: {} ({}) for: {}",
                         new Object[]{getInitialDelay(), getDelay(), getTimeUnit().name().toLowerCase(Locale.ENGLISH), getEndpoint()});
             }
-            future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
+            future = scheduledExecutorService.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
         }
     }
 
@@ -342,9 +359,9 @@ public abstract class ScheduledPollConsu
 
     @Override
     protected void doShutdown() throws Exception {
-        if (shutdownExecutor && executor != null) {
-            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
-            executor = null;
+        if (shutdownExecutor && scheduledExecutorService != null) {
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(scheduledExecutorService);
+            scheduledExecutorService = null;
             future = null;
         }
         super.doShutdown();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java?rev=1353260&r1=1353259&r2=1353260&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollEndpoint.java Sun Jun 24 12:58:00 2012
@@ -62,13 +62,14 @@ public abstract class ScheduledPollEndpo
         Object pollStrategy = options.remove("pollStrategy");
         Object runLoggingLevel = options.remove("runLoggingLevel");
         Object sendEmptyMessageWhenIdle = options.remove("sendEmptyMessageWhenIdle");
+        Object scheduledExecutorService  = options.remove("scheduledExecutorService");
         boolean setConsumerProperties = false;
         
         // the following is split into two if statements to satisfy the checkstyle max complexity constraint
         if (initialDelay != null || delay != null || timeUnit != null || useFixedDelay != null || pollStrategy != null) {
             setConsumerProperties = true;
         }
-        if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null) {
+        if (runLoggingLevel != null || startScheduler != null || sendEmptyMessageWhenIdle != null || scheduledExecutorService != null) {
             setConsumerProperties = true;
         }
         
@@ -101,6 +102,9 @@ public abstract class ScheduledPollEndpo
             if (sendEmptyMessageWhenIdle != null) {
                 consumerProperties.put("sendEmptyMessageWhenIdle", sendEmptyMessageWhenIdle);
             }
+            if (scheduledExecutorService != null) {
+                consumerProperties.put("scheduledExecutorService", scheduledExecutorService);
+            }
         }
     }
     

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java?rev=1353260&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollStopRouteTest.java Sun Jun 24 12:58:00 2012
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class FileConsumerSharedThreadPollStopRouteTest extends FileConsumerSharedThreadPollTest {
+
+    public void testSharedThreadPool() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        // thread thread name should be the same
+        mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));
+
+        template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // now stop a
+        context.stopRoute("a");
+
+        resetMocks();
+        mock.expectedBodiesReceived("Bye World 2");
+        // a should not be polled
+        mock.expectedFileExists("target/a/hello2.txt");
+
+        template.sendBodyAndHeader("file:target/a", "Hello World 2", Exchange.FILE_NAME, "hello2.txt");
+        template.sendBodyAndHeader("file:target/b", "Bye World 2", Exchange.FILE_NAME, "bye2.txt");
+
+        assertMockEndpointsSatisfied();
+
+        // now start a, which should pickup the file
+        resetMocks();
+        mock.expectedBodiesReceived("Hello World 2");
+        context.startRoute("a");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java?rev=1353260&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerSharedThreadPollTest.java Sun Jun 24 12:58:00 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.builder.ThreadPoolBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+
+/**
+ *
+ */
+public class FileConsumerSharedThreadPollTest extends ContextTestSupport {
+
+    private ScheduledExecutorService pool;
+    private SimpleRegistry registry = new SimpleRegistry();
+
+    @Override
+    protected void setUp() throws Exception {
+        deleteDirectory("target/a");
+        deleteDirectory("target/b");
+        super.setUp();
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        return new DefaultCamelContext(registry);
+    }
+
+    public void testSharedThreadPool() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+        // thread thread name should be the same
+        mock.message(0).header("threadName").isEqualTo(mock.message(1).header("threadName"));
+
+        template.sendBodyAndHeader("file:target/a", "Hello World", Exchange.FILE_NAME, "hello.txt");
+        template.sendBodyAndHeader("file:target/b", "Bye World", Exchange.FILE_NAME, "bye.txt");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // create shared pool and enlist in registry
+                pool = new ThreadPoolBuilder(context).poolSize(1).buildScheduled(this, "MySharedPool");
+                registry.put("myPool", pool);
+
+                from("file:target/a?scheduledExecutorService=#myPool").routeId("a")
+                    .to("direct:shared");
+
+                from("file:target/b?scheduledExecutorService=#myPool").routeId("b")
+                    .to("direct:shared");
+
+                from("direct:shared").routeId("shared")
+                    .convertBodyTo(String.class)
+                    .log("Get ${file:name} using ${threadName}")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            exchange.getIn().setHeader("threadName", Thread.currentThread().getName());
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}