You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/18 17:21:14 UTC

svn commit: r1374596 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service: ServiceDispatcher.java job/JobManager.java job/JobPoller.java

Author: adrianc
Date: Sat Aug 18 15:21:14 2012
New Revision: 1374596

URL: http://svn.apache.org/viewvc?rev=1374596&view=rev
Log:
Refactored JobPoller.java - the JobPoller instance is now a singleton.


Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java Sat Aug 18 15:21:14 2012
@@ -834,8 +834,6 @@ public class ServiceDispatcher {
             // shutdown JMS listeners
             jlf.closeListeners();
         }
-        // shutdown the job scheduler
-        jm.shutdown();
     }
 
     // checks if parameters were passed for authentication

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 18 15:21:14 2012
@@ -59,6 +59,10 @@ import com.ibm.icu.util.Calendar;
  * {@link #runJob(Job)} method, or schedule a job to be run later by calling the
  * {@link #schedule(String, String, String, Map, long, int, int, int, long, int)} method.
  * Scheduled jobs are persisted in the JobSandbox entity.
+ * <p>A scheduled job's start time is an approximation - the actual start time will depend
+ * on the job manager/job poller configuration (poll interval) and the load on the server.
+ * Scheduled jobs might be rescheduled if the server is busy. Therefore, applications
+ * requiring a precise job start time should use a different method to schedule the job.</p>
  */
 public final class JobManager {
 
@@ -82,13 +86,14 @@ public final class JobManager {
     public static JobManager getInstance(Delegator delegator, boolean enablePoller) {
         assertIsRunning();
         Assert.notNull("delegator", delegator);
-        JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
+        JobManager jm = registeredManagers.get(delegator.getDelegatorName());
         if (jm == null) {
             jm = new JobManager(delegator);
-            JobManager.registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm);
-            jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
+            registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm);
+            jm = registeredManagers.get(delegator.getDelegatorName());
             if (enablePoller) {
-                jm.enablePoller();
+                jm.reloadCrashedJobs();
+                JobPoller.registerJobManager(jm);
             }
         }
         return jm;
@@ -99,26 +104,14 @@ public final class JobManager {
      */
     public static void shutDown() {
         isShutDown = true;
-        for (JobManager jm : registeredManagers.values()) {
-            jm.shutdown();
-        }
+        JobPoller.getInstance().stop();
     }
 
     private final Delegator delegator;
-    private final JobPoller jp;
-    private boolean pollerEnabled = false;
+    private boolean crashedJobsReloaded = false;
 
     private JobManager(Delegator delegator) {
         this.delegator = delegator;
-        jp = new JobPoller(this);
-    }
-
-    private synchronized void enablePoller() {
-        if (!pollerEnabled) {
-            pollerEnabled = true;
-            reloadCrashedJobs();
-            jp.enable();
-        }
     }
 
     /** Returns the Delegator. */
@@ -138,7 +131,7 @@ public final class JobManager {
      * @return List containing a Map of each thread's state.
      */
     public Map<String, Object> getPoolState() {
-        return jp.getPoolState();
+        return JobPoller.getInstance().getPoolState();
     }
 
     /**
@@ -272,7 +265,11 @@ public final class JobManager {
         return poll;
     }
 
-    private void reloadCrashedJobs() {
+    private synchronized void reloadCrashedJobs() {
+        assertIsRunning();
+        if (crashedJobsReloaded) {
+            return;
+        }
         List<GenericValue> crashed = null;
         List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"),
                 EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
@@ -317,6 +314,7 @@ public final class JobManager {
             if (Debug.infoOn())
                 Debug.logInfo("No crashed jobs to re-schedule", module);
         }
+        crashedJobsReloaded = true;
     }
 
     /** Queues a Job to run now.
@@ -326,7 +324,7 @@ public final class JobManager {
     public void runJob(Job job) throws JobManagerException {
         assertIsRunning();
         if (job.isValid()) {
-            jp.queueNow(job);
+            JobPoller.getInstance().queueNow(job);
         }
     }
 
@@ -541,13 +539,4 @@ public final class JobManager {
             throw new JobManagerException(e.getMessage(), e);
         }
     }
-
-    /** Close out the scheduler thread. */
-    public void shutdown() {
-        Debug.logInfo("Stopping the JobManager...", module);
-        registeredManagers.remove(delegator.getDelegatorName(), this);
-        jp.stop();
-        Debug.logInfo("JobManager stopped.", module);
-    }
-
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 18 15:21:14 2012
@@ -19,10 +19,13 @@
 package org.ofbiz.service.job;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
@@ -30,13 +33,14 @@ import java.util.concurrent.ThreadPoolEx
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.ofbiz.base.util.Assert;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.service.config.ServiceConfigUtil;
 
 /**
  * Job poller. Queues and runs jobs.
  */
-public final class JobPoller implements Runnable {
+public final class JobPoller {
 
     public static final String module = JobPoller.class.getName();
     private static final AtomicInteger created = new AtomicInteger();
@@ -45,76 +49,19 @@ public final class JobPoller implements 
     private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
     private static final int QUEUE_SIZE = 100;
     private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
-
-    private final JobManager jm;
-    private final ThreadPoolExecutor executor;
-    private final String name;
-    private boolean enabled = false;
+    private static final ConcurrentHashMap<String, JobManager> jobManagers = new ConcurrentHashMap<String, JobManager>();
+    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(),
+            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+    private static final JobPoller instance = new JobPoller();
 
     /**
-     * Creates a new JobScheduler
-     * 
-     * @param jm
-     *            JobManager associated with this scheduler
+     * Returns the <code>JobPoller</code> instance.
      */
-    public JobPoller(JobManager jm) {
-        this.name = jm.getDelegator().getDelegatorName();
-        this.jm = jm;
-        this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
-                new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
-    }
-
-    public synchronized void enable() {
-        if (!enabled) {
-            enabled = true;
-            // start the thread only if polling is enabled
-            if (pollEnabled()) {
-                // create the poller thread
-                Thread thread = new Thread(this, "OFBiz-JobPoller-" + this.name);
-                thread.setDaemon(false);
-                // start the poller
-                thread.start();
-            }
-        }
+    public static JobPoller getInstance() {
+        return instance;
     }
 
-    /**
-     * Returns the JobManager
-     */
-    public JobManager getManager() {
-        return jm;
-    }
-
-    public Map<String, Object> getPoolState() {
-        Map<String, Object> poolState = new HashMap<String, Object>();
-        poolState.put("pollerName", this.name);
-        poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name);
-        poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>");
-        poolState.put("keepAliveTimeInSeconds", this.executor.getKeepAliveTime(TimeUnit.SECONDS));
-        poolState.put("numberOfCoreInvokerThreads", this.executor.getCorePoolSize());
-        poolState.put("currentNumberOfInvokerThreads", this.executor.getPoolSize());
-        poolState.put("numberOfActiveInvokerThreads", this.executor.getActiveCount());
-        poolState.put("maxNumberOfInvokerThreads", this.executor.getMaximumPoolSize());
-        poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize());
-        poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount());
-        BlockingQueue<Runnable> queue = this.executor.getQueue();
-        List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
-        Map<String, Object> taskInfo = null;
-        for (Runnable task : queue) {
-            JobInvoker jobInvoker = (JobInvoker) task;
-            taskInfo = new HashMap<String, Object>();
-            taskInfo.put("id", jobInvoker.getJobId());
-            taskInfo.put("name", jobInvoker.getJobName());
-            taskInfo.put("serviceName", jobInvoker.getServiceName());
-            taskInfo.put("time", jobInvoker.getTime());
-            taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
-            taskList.add(taskInfo);
-        }
-        poolState.put("taskList", taskList);
-        return poolState;
-    }
-
-    private long getTTL() {
+    private static long getTTL() {
         String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
         if (!threadTTLAttr.isEmpty()) {
             try {
@@ -129,7 +76,7 @@ public final class JobPoller implements 
         return THREAD_TTL;
     }
 
-    private int maxThreads() {
+    private static int maxThreads() {
         String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
         if (!maxThreadsAttr.isEmpty()) {
             try {
@@ -144,7 +91,7 @@ public final class JobPoller implements 
         return MAX_THREADS;
     }
 
-    private int minThreads() {
+    private static int minThreads() {
         String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
         if (!minThreadsAttr.isEmpty()) {
             try {
@@ -159,14 +106,7 @@ public final class JobPoller implements 
         return MIN_THREADS;
     }
 
-    private boolean pollEnabled() {
-        String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
-        if (enabled.equalsIgnoreCase("false"))
-            return false;
-        return true;
-    }
-
-    private int pollWaitTime() {
+    private static int pollWaitTime() {
         String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
         if (!pollIntervalAttr.isEmpty()) {
             try {
@@ -181,7 +121,7 @@ public final class JobPoller implements 
         return POLL_WAIT;
     }
 
-    private int queueSize() {
+    private static int queueSize() {
         String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
         if (!queueSizeAttr.isEmpty()) {
             try {
@@ -197,51 +137,88 @@ public final class JobPoller implements 
     }
 
     /**
-     * Adds a job to the RUN queue.
+     * Register a {@link JobManager} with the job poller.
+     * 
+     * @param jm The <code>JobManager</code> to register.
+     * @throws IllegalArgumentException if <code>jm</code> is null
+     */
+    public static void registerJobManager(JobManager jm) {
+        Assert.notNull("jm", jm);
+        jobManagers.putIfAbsent(jm.getDelegator().getDelegatorName(), jm);
+    }
+
+    // -------------------------------------- //
+
+    private final Thread jobManagerPollerThread;
+
+    private JobPoller() {
+        if (pollEnabled()) {
+            jobManagerPollerThread = new Thread(new JobManagerPoller(), "OFBiz-JobPoller");
+            jobManagerPollerThread.setDaemon(false);
+            jobManagerPollerThread.start();
+        } else {
+            jobManagerPollerThread = null;
+        }
+    }
+
+    /**
+     * Returns a <code>Map</code> containing <code>JobPoller</code> statistics.
+     */
+    public Map<String, Object> getPoolState() {
+        Map<String, Object> poolState = new HashMap<String, Object>();
+        poolState.put("keepAliveTimeInSeconds", executor.getKeepAliveTime(TimeUnit.SECONDS));
+        poolState.put("numberOfCoreInvokerThreads", executor.getCorePoolSize());
+        poolState.put("currentNumberOfInvokerThreads", executor.getPoolSize());
+        poolState.put("numberOfActiveInvokerThreads", executor.getActiveCount());
+        poolState.put("maxNumberOfInvokerThreads", executor.getMaximumPoolSize());
+        poolState.put("greatestNumberOfInvokerThreads", executor.getLargestPoolSize());
+        poolState.put("numberOfCompletedTasks", executor.getCompletedTaskCount());
+        BlockingQueue<Runnable> queue = executor.getQueue();
+        List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
+        Map<String, Object> taskInfo = null;
+        for (Runnable task : queue) {
+            JobInvoker jobInvoker = (JobInvoker) task;
+            taskInfo = new HashMap<String, Object>();
+            taskInfo.put("id", jobInvoker.getJobId());
+            taskInfo.put("name", jobInvoker.getJobName());
+            taskInfo.put("serviceName", jobInvoker.getServiceName());
+            taskInfo.put("time", jobInvoker.getTime());
+            taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
+            taskList.add(taskInfo);
+        }
+        poolState.put("taskList", taskList);
+        return poolState;
+    }
+
+    private boolean pollEnabled() {
+        String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
+        return !"false".equalsIgnoreCase(enabled);
+    }
+
+    /**
+     * Adds a job to the job queue.
      * @throws InvalidJobException if the job is in an invalid state.
      * @throws RejectedExecutionException if the poller is stopped.
      */
     public void queueNow(Job job) throws InvalidJobException {
         job.queue();
         try {
-            this.executor.execute(new JobInvoker(job));
+            executor.execute(new JobInvoker(job));
         } catch (Exception e) {
             job.deQueue();
         }
     }
 
-    public void run() {
-        try {
-            // wait 30 seconds before the first poll
-            Thread.sleep(30000);
-            while (!executor.isShutdown()) {
-                int remainingCapacity = executor.getQueue().remainingCapacity();
-                if (remainingCapacity > 0) {
-                    List<Job> pollList = jm.poll(remainingCapacity);
-                    for (Job job : pollList) {
-                        try {
-                            queueNow(job);
-                        } catch (InvalidJobException e) {
-                            Debug.logError(e, module);
-                        }
-                    }
-                }
-                Thread.sleep(pollWaitTime());
-            }
-        } catch (InterruptedException e) {
-            Debug.logError(e, module);
-            stop();
-            Thread.currentThread().interrupt();
-        }
-        Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
-    }
-
     /**
-     * Stops the JobPoller
+     * Stops the <code>JobPoller</code>. This method is called when OFBiz shuts down.
+     * The <code>JobPoller</code> cannot be restarted.
      */
-    void stop() {
-        Debug.logInfo("Shutting down thread pool for JobPoller " + this.name, module);
-        List<Runnable> queuedJobs = this.executor.shutdownNow();
+    public void stop() {
+        Debug.logInfo("Shutting down JobPoller.", module);
+        if (jobManagerPollerThread != null) {
+            jobManagerPollerThread.interrupt();
+        }
+        List<Runnable> queuedJobs = executor.shutdownNow();
         for (Runnable task : queuedJobs) {
             try {
                 Job queuedJob = (Job) task;
@@ -250,19 +227,64 @@ public final class JobPoller implements 
                 Debug.logWarning(e, module);
             }
         }
-        Debug.logInfo("Shutdown completed of thread pool for JobPoller " + this.name, module);
+        Debug.logInfo("JobPoller shutdown completed.", module);
     }
 
+    private static class JobInvokerThreadFactory implements ThreadFactory {
 
-    private class JobInvokerThreadFactory implements ThreadFactory {
-        private final String poolName;
-
-        public JobInvokerThreadFactory(String poolName) {
-            this.poolName = poolName;
+        public Thread newThread(Runnable runnable) {
+            return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
         }
+    }
 
-        public Thread newThread(Runnable runnable) {
-            return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
+    // Polls all registered JobManagers for jobs to queue.
+    private class JobManagerPoller implements Runnable {
+
+        // Do not check for interrupts in this method. The design requires the
+        // thread to complete the job manager poll uninterrupted.
+        public void run() {
+            Debug.logInfo("JobPoller thread started.", module);
+            try {
+                // wait 30 seconds before the first poll
+                Thread.sleep(30000);
+                while (!executor.isShutdown()) {
+                    int remainingCapacity = executor.getQueue().remainingCapacity();
+                    if (remainingCapacity > 0) {
+                        // Build "list of lists"
+                        Collection<JobManager> jmCollection = jobManagers.values();
+                        List<Iterator<Job>> pollResults = new ArrayList<Iterator<Job>>();
+                        for (JobManager jm : jmCollection) {
+                            pollResults.add(jm.poll(remainingCapacity).iterator());
+                        }
+                        // Create queue candidate list from "list of lists"
+                        List<Job> queueCandidates = new ArrayList<Job>();
+                        boolean addingJobs = true;
+                        while (addingJobs) {
+                            addingJobs = false;
+                            for (Iterator<Job> jobIterator : pollResults) {
+                                if (jobIterator.hasNext()) {
+                                    queueCandidates.add(jobIterator.next());
+                                    addingJobs = true;
+                                }
+                            }
+                        }
+                        // The candidate list might be larger than the queue remaining capacity,
+                        // but that is okay - the excess jobs will be dequeued and rescheduled.
+                        for (Job job : queueCandidates) {
+                            try {
+                                queueNow(job);
+                            } catch (InvalidJobException e) {
+                                Debug.logError(e, module);
+                            }
+                        }
+                    }
+                    Thread.sleep(pollWaitTime());
+                }
+            } catch (InterruptedException e) {
+                // Happens when JobPoller shuts down - nothing to do.
+                Thread.currentThread().interrupt();
+            }
+            Debug.logInfo("JobPoller thread stopped.", module);
         }
     }
 }