You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/16 08:47:57 UTC

git commit: CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.

Updated Branches:
  refs/heads/master 546c3fd4a -> 2f75b2f1b


CAMEL-6631: Introduce ScheduledPollConsumerScheduler SPI to plugin different schedulers for poll consumer components such as file/ftp etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2f75b2f1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2f75b2f1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2f75b2f1

Branch: refs/heads/master
Commit: 2f75b2f1bd2f23a864c2dc3c31b1badfd40cc743
Parents: 546c3fd
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 16 08:47:47 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 16 08:47:47 2013 +0200

----------------------------------------------------------------------
 .../mbean/ManagedSchedulePollConsumerMBean.java |  5 ++
 .../DefaultScheduledPollConsumerScheduler.java  |  2 +-
 .../mbean/ManagedScheduledPollConsumer.java     |  5 ++
 .../SpringScheduledPollConsumerScheduler.java   |  6 +--
 .../util/CamelThreadPoolTaskScheduler.java      | 53 ++++++++++++++++++++
 5 files changed, 66 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
index 831748a..3179ee6 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.api.management.mbean;
 
+import java.util.Map;
+
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedOperation;
 
@@ -51,4 +53,7 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean {
     @ManagedOperation(description = "Starts the scheduler")
     void startScheduler();
 
+    @ManagedAttribute(description = "Scheduler classname")
+    String getSchedulerClassName();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
index 325667c..13ed058 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultScheduledPollConsumerScheduler.java
@@ -143,7 +143,7 @@ public class DefaultScheduledPollConsumerScheduler extends org.apache.camel.supp
         if (scheduledExecutorService == null) {
             // we only need one thread in the pool to schedule this task
             this.scheduledExecutorService = getCamelContext().getExecutorServiceManager()
-                    .newScheduledThreadPool(this, consumer.getEndpoint().getEndpointUri(), 1);
+                    .newSingleThreadScheduledExecutor(consumer, consumer.getEndpoint().getEndpointUri());
             // and we should shutdown the thread pool when no longer needed
             this.shutdownExecutor = true;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
index cddfbb9..ad341a3 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
@@ -78,4 +78,9 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man
     public void startScheduler() {
         getConsumer().startScheduler();
     }
+
+    public String getSchedulerClassName() {
+        return getConsumer().getScheduler().getClass().getName();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java
index 50aa3be..a9f1e55 100644
--- a/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/pollingconsumer/SpringScheduledPollConsumerScheduler.java
@@ -22,9 +22,9 @@ import java.util.concurrent.ScheduledFuture;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.spi.ScheduledPollConsumerScheduler;
+import org.apache.camel.spring.util.CamelThreadPoolTaskScheduler;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.concurrent.CamelThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -116,9 +116,7 @@ public class SpringScheduledPollConsumerScheduler extends ServiceSupport impleme
         trigger = new CronTrigger(getCron(), getTimeZone());
 
         if (taskScheduler == null) {
-            taskScheduler = new ThreadPoolTaskScheduler();
-            CamelThreadFactory tf = new CamelThreadFactory(getCamelContext().getExecutorServiceManager().getThreadNamePattern(), "SpringScheduledPollConsumerSchedulerTask", true);
-            taskScheduler.setThreadFactory(tf);
+            taskScheduler = new CamelThreadPoolTaskScheduler(getCamelContext(), consumer, consumer.getEndpoint().getEndpointUri());
             taskScheduler.afterPropertiesSet();
             destroyTaskScheduler = true;
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/2f75b2f1/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java b/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java
new file mode 100644
index 0000000..5067e3d
--- /dev/null
+++ b/components/camel-spring/src/main/java/org/apache/camel/spring/util/CamelThreadPoolTaskScheduler.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.util;
+
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.camel.CamelContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+/**
+ * A Camel extension of Spring's {@link ThreadPoolTaskScheduler} which uses the
+ * {@link org.apache.camel.spi.ExecutorServiceManager} to create and destroy the
+ * thread pool, which ensures the thread pool is also managed and consistent with
+ * other usages of thread pools in Camel.
+ */
+public class CamelThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {
+
+    private final CamelContext camelContext;
+    private final Object source;
+    private final String name;
+
+    public CamelThreadPoolTaskScheduler(CamelContext camelContext, Object source, String name) {
+        this.camelContext = camelContext;
+        this.source = source;
+        this.name = name;
+    }
+
+    @Override
+    protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
+        return camelContext.getExecutorServiceManager().newScheduledThreadPool(source, name, poolSize);
+    }
+
+    @Override
+    public void shutdown() {
+        camelContext.getExecutorServiceManager().shutdownNow(getScheduledExecutor());
+    }
+}