You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/18 17:21:14 UTC
svn commit: r1374596 - in
/ofbiz/trunk/framework/service/src/org/ofbiz/service:
ServiceDispatcher.java job/JobManager.java job/JobPoller.java
Author: adrianc
Date: Sat Aug 18 15:21:14 2012
New Revision: 1374596
URL: http://svn.apache.org/viewvc?rev=1374596&view=rev
Log:
Refactored JobPoller.java - the JobPoller instance is now a singleton.
Modified:
ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/ServiceDispatcher.java Sat Aug 18 15:21:14 2012
@@ -834,8 +834,6 @@ public class ServiceDispatcher {
// shutdown JMS listeners
jlf.closeListeners();
}
- // shutdown the job scheduler
- jm.shutdown();
}
// checks if parameters were passed for authentication
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug 18 15:21:14 2012
@@ -59,6 +59,10 @@ import com.ibm.icu.util.Calendar;
* {@link #runJob(Job)} method, or schedule a job to be run later by calling the
* {@link #schedule(String, String, String, Map, long, int, int, int, long, int)} method.
* Scheduled jobs are persisted in the JobSandbox entity.
+ * <p>A scheduled job's start time is an approximation - the actual start time will depend
+ * on the job manager/job poller configuration (poll interval) and the load on the server.
+ * Scheduled jobs might be rescheduled if the server is busy. Therefore, applications
+ * requiring a precise job start time should use a different method to schedule the job.</p>
*/
public final class JobManager {
@@ -82,13 +86,14 @@ public final class JobManager {
public static JobManager getInstance(Delegator delegator, boolean enablePoller) {
assertIsRunning();
Assert.notNull("delegator", delegator);
- JobManager jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
+ JobManager jm = registeredManagers.get(delegator.getDelegatorName());
if (jm == null) {
jm = new JobManager(delegator);
- JobManager.registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm);
- jm = JobManager.registeredManagers.get(delegator.getDelegatorName());
+ registeredManagers.putIfAbsent(delegator.getDelegatorName(), jm);
+ jm = registeredManagers.get(delegator.getDelegatorName());
if (enablePoller) {
- jm.enablePoller();
+ jm.reloadCrashedJobs();
+ JobPoller.registerJobManager(jm);
}
}
return jm;
@@ -99,26 +104,14 @@ public final class JobManager {
*/
public static void shutDown() {
isShutDown = true;
- for (JobManager jm : registeredManagers.values()) {
- jm.shutdown();
- }
+ JobPoller.getInstance().stop();
}
private final Delegator delegator;
- private final JobPoller jp;
- private boolean pollerEnabled = false;
+ private boolean crashedJobsReloaded = false;
private JobManager(Delegator delegator) {
this.delegator = delegator;
- jp = new JobPoller(this);
- }
-
- private synchronized void enablePoller() {
- if (!pollerEnabled) {
- pollerEnabled = true;
- reloadCrashedJobs();
- jp.enable();
- }
}
/** Returns the Delegator. */
@@ -138,7 +131,7 @@ public final class JobManager {
* @return List containing a Map of each thread's state.
*/
public Map<String, Object> getPoolState() {
- return jp.getPoolState();
+ return JobPoller.getInstance().getPoolState();
}
/**
@@ -272,7 +265,11 @@ public final class JobManager {
return poll;
}
- private void reloadCrashedJobs() {
+ private synchronized void reloadCrashedJobs() {
+ assertIsRunning();
+ if (crashedJobsReloaded) {
+ return;
+ }
List<GenericValue> crashed = null;
List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"),
EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
@@ -317,6 +314,7 @@ public final class JobManager {
if (Debug.infoOn())
Debug.logInfo("No crashed jobs to re-schedule", module);
}
+ crashedJobsReloaded = true;
}
/** Queues a Job to run now.
@@ -326,7 +324,7 @@ public final class JobManager {
public void runJob(Job job) throws JobManagerException {
assertIsRunning();
if (job.isValid()) {
- jp.queueNow(job);
+ JobPoller.getInstance().queueNow(job);
}
}
@@ -541,13 +539,4 @@ public final class JobManager {
throw new JobManagerException(e.getMessage(), e);
}
}
-
- /** Close out the scheduler thread. */
- public void shutdown() {
- Debug.logInfo("Stopping the JobManager...", module);
- registeredManagers.remove(delegator.getDelegatorName(), this);
- jp.stop();
- Debug.logInfo("JobManager stopped.", module);
- }
-
}
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1374596&r1=1374595&r2=1374596&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug 18 15:21:14 2012
@@ -19,10 +19,13 @@
package org.ofbiz.service.job;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
@@ -30,13 +33,14 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.ofbiz.base.util.Assert;
import org.ofbiz.base.util.Debug;
import org.ofbiz.service.config.ServiceConfigUtil;
/**
* Job poller. Queues and runs jobs.
*/
-public final class JobPoller implements Runnable {
+public final class JobPoller {
public static final String module = JobPoller.class.getName();
private static final AtomicInteger created = new AtomicInteger();
@@ -45,76 +49,19 @@ public final class JobPoller implements
private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
private static final int QUEUE_SIZE = 100;
private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
-
- private final JobManager jm;
- private final ThreadPoolExecutor executor;
- private final String name;
- private boolean enabled = false;
+ private static final ConcurrentHashMap<String, JobManager> jobManagers = new ConcurrentHashMap<String, JobManager>();
+ private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(),
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+ private static final JobPoller instance = new JobPoller();
/**
- * Creates a new JobScheduler
- *
- * @param jm
- * JobManager associated with this scheduler
+ * Returns the <code>JobPoller</code> instance.
*/
- public JobPoller(JobManager jm) {
- this.name = jm.getDelegator().getDelegatorName();
- this.jm = jm;
- this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
- new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
- }
-
- public synchronized void enable() {
- if (!enabled) {
- enabled = true;
- // start the thread only if polling is enabled
- if (pollEnabled()) {
- // create the poller thread
- Thread thread = new Thread(this, "OFBiz-JobPoller-" + this.name);
- thread.setDaemon(false);
- // start the poller
- thread.start();
- }
- }
+ public static JobPoller getInstance() {
+ return instance;
}
- /**
- * Returns the JobManager
- */
- public JobManager getManager() {
- return jm;
- }
-
- public Map<String, Object> getPoolState() {
- Map<String, Object> poolState = new HashMap<String, Object>();
- poolState.put("pollerName", this.name);
- poolState.put("pollerThreadName", "OFBiz-JobPoller-" + this.name);
- poolState.put("invokerThreadNameFormat", "OFBiz-JobInvoker-" + this.name + "-<SEQ>");
- poolState.put("keepAliveTimeInSeconds", this.executor.getKeepAliveTime(TimeUnit.SECONDS));
- poolState.put("numberOfCoreInvokerThreads", this.executor.getCorePoolSize());
- poolState.put("currentNumberOfInvokerThreads", this.executor.getPoolSize());
- poolState.put("numberOfActiveInvokerThreads", this.executor.getActiveCount());
- poolState.put("maxNumberOfInvokerThreads", this.executor.getMaximumPoolSize());
- poolState.put("greatestNumberOfInvokerThreads", this.executor.getLargestPoolSize());
- poolState.put("numberOfCompletedTasks", this.executor.getCompletedTaskCount());
- BlockingQueue<Runnable> queue = this.executor.getQueue();
- List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
- Map<String, Object> taskInfo = null;
- for (Runnable task : queue) {
- JobInvoker jobInvoker = (JobInvoker) task;
- taskInfo = new HashMap<String, Object>();
- taskInfo.put("id", jobInvoker.getJobId());
- taskInfo.put("name", jobInvoker.getJobName());
- taskInfo.put("serviceName", jobInvoker.getServiceName());
- taskInfo.put("time", jobInvoker.getTime());
- taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
- taskList.add(taskInfo);
- }
- poolState.put("taskList", taskList);
- return poolState;
- }
-
- private long getTTL() {
+ private static long getTTL() {
String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
if (!threadTTLAttr.isEmpty()) {
try {
@@ -129,7 +76,7 @@ public final class JobPoller implements
return THREAD_TTL;
}
- private int maxThreads() {
+ private static int maxThreads() {
String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
if (!maxThreadsAttr.isEmpty()) {
try {
@@ -144,7 +91,7 @@ public final class JobPoller implements
return MAX_THREADS;
}
- private int minThreads() {
+ private static int minThreads() {
String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
if (!minThreadsAttr.isEmpty()) {
try {
@@ -159,14 +106,7 @@ public final class JobPoller implements
return MIN_THREADS;
}
- private boolean pollEnabled() {
- String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
- if (enabled.equalsIgnoreCase("false"))
- return false;
- return true;
- }
-
- private int pollWaitTime() {
+ private static int pollWaitTime() {
String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
if (!pollIntervalAttr.isEmpty()) {
try {
@@ -181,7 +121,7 @@ public final class JobPoller implements
return POLL_WAIT;
}
- private int queueSize() {
+ private static int queueSize() {
String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
if (!queueSizeAttr.isEmpty()) {
try {
@@ -197,51 +137,88 @@ public final class JobPoller implements
}
/**
- * Adds a job to the RUN queue.
+ * Register a {@link JobManager} with the job poller.
+ *
+ * @param jm The <code>JobManager</code> to register.
+ * @throws IllegalArgumentException if <code>jm</code> is null
+ */
+ public static void registerJobManager(JobManager jm) {
+ Assert.notNull("jm", jm);
+ jobManagers.putIfAbsent(jm.getDelegator().getDelegatorName(), jm);
+ }
+
+ // -------------------------------------- //
+
+ private final Thread jobManagerPollerThread;
+
+ private JobPoller() {
+ if (pollEnabled()) {
+ jobManagerPollerThread = new Thread(new JobManagerPoller(), "OFBiz-JobPoller");
+ jobManagerPollerThread.setDaemon(false);
+ jobManagerPollerThread.start();
+ } else {
+ jobManagerPollerThread = null;
+ }
+ }
+
+ /**
+ * Returns a <code>Map</code> containing <code>JobPoller</code> statistics.
+ */
+ public Map<String, Object> getPoolState() {
+ Map<String, Object> poolState = new HashMap<String, Object>();
+ poolState.put("keepAliveTimeInSeconds", executor.getKeepAliveTime(TimeUnit.SECONDS));
+ poolState.put("numberOfCoreInvokerThreads", executor.getCorePoolSize());
+ poolState.put("currentNumberOfInvokerThreads", executor.getPoolSize());
+ poolState.put("numberOfActiveInvokerThreads", executor.getActiveCount());
+ poolState.put("maxNumberOfInvokerThreads", executor.getMaximumPoolSize());
+ poolState.put("greatestNumberOfInvokerThreads", executor.getLargestPoolSize());
+ poolState.put("numberOfCompletedTasks", executor.getCompletedTaskCount());
+ BlockingQueue<Runnable> queue = executor.getQueue();
+ List<Map<String, Object>> taskList = new ArrayList<Map<String, Object>>();
+ Map<String, Object> taskInfo = null;
+ for (Runnable task : queue) {
+ JobInvoker jobInvoker = (JobInvoker) task;
+ taskInfo = new HashMap<String, Object>();
+ taskInfo.put("id", jobInvoker.getJobId());
+ taskInfo.put("name", jobInvoker.getJobName());
+ taskInfo.put("serviceName", jobInvoker.getServiceName());
+ taskInfo.put("time", jobInvoker.getTime());
+ taskInfo.put("runtime", jobInvoker.getCurrentRuntime());
+ taskList.add(taskInfo);
+ }
+ poolState.put("taskList", taskList);
+ return poolState;
+ }
+
+ private boolean pollEnabled() {
+ String enabled = ServiceConfigUtil.getElementAttr("thread-pool", "poll-enabled");
+ return !"false".equalsIgnoreCase(enabled);
+ }
+
+ /**
+ * Adds a job to the job queue.
* @throws InvalidJobException if the job is in an invalid state.
* @throws RejectedExecutionException if the poller is stopped.
*/
public void queueNow(Job job) throws InvalidJobException {
job.queue();
try {
- this.executor.execute(new JobInvoker(job));
+ executor.execute(new JobInvoker(job));
} catch (Exception e) {
job.deQueue();
}
}
- public void run() {
- try {
- // wait 30 seconds before the first poll
- Thread.sleep(30000);
- while (!executor.isShutdown()) {
- int remainingCapacity = executor.getQueue().remainingCapacity();
- if (remainingCapacity > 0) {
- List<Job> pollList = jm.poll(remainingCapacity);
- for (Job job : pollList) {
- try {
- queueNow(job);
- } catch (InvalidJobException e) {
- Debug.logError(e, module);
- }
- }
- }
- Thread.sleep(pollWaitTime());
- }
- } catch (InterruptedException e) {
- Debug.logError(e, module);
- stop();
- Thread.currentThread().interrupt();
- }
- Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
- }
-
/**
- * Stops the JobPoller
+ * Stops the <code>JobPoller</code>. This method is called when OFBiz shuts down.
+ * The <code>JobPoller</code> cannot be restarted.
*/
- void stop() {
- Debug.logInfo("Shutting down thread pool for JobPoller " + this.name, module);
- List<Runnable> queuedJobs = this.executor.shutdownNow();
+ public void stop() {
+ Debug.logInfo("Shutting down JobPoller.", module);
+ if (jobManagerPollerThread != null) {
+ jobManagerPollerThread.interrupt();
+ }
+ List<Runnable> queuedJobs = executor.shutdownNow();
for (Runnable task : queuedJobs) {
try {
Job queuedJob = (Job) task;
@@ -250,19 +227,64 @@ public final class JobPoller implements
Debug.logWarning(e, module);
}
}
- Debug.logInfo("Shutdown completed of thread pool for JobPoller " + this.name, module);
+ Debug.logInfo("JobPoller shutdown completed.", module);
}
+ private static class JobInvokerThreadFactory implements ThreadFactory {
- private class JobInvokerThreadFactory implements ThreadFactory {
- private final String poolName;
-
- public JobInvokerThreadFactory(String poolName) {
- this.poolName = poolName;
+ public Thread newThread(Runnable runnable) {
+ return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
}
+ }
- public Thread newThread(Runnable runnable) {
- return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
+ // Polls all registered JobManagers for jobs to queue.
+ private class JobManagerPoller implements Runnable {
+
+ // Do not check for interrupts in this method. The design requires the
+ // thread to complete the job manager poll uninterrupted.
+ public void run() {
+ Debug.logInfo("JobPoller thread started.", module);
+ try {
+ // wait 30 seconds before the first poll
+ Thread.sleep(30000);
+ while (!executor.isShutdown()) {
+ int remainingCapacity = executor.getQueue().remainingCapacity();
+ if (remainingCapacity > 0) {
+ // Build "list of lists"
+ Collection<JobManager> jmCollection = jobManagers.values();
+ List<Iterator<Job>> pollResults = new ArrayList<Iterator<Job>>();
+ for (JobManager jm : jmCollection) {
+ pollResults.add(jm.poll(remainingCapacity).iterator());
+ }
+ // Create queue candidate list from "list of lists"
+ List<Job> queueCandidates = new ArrayList<Job>();
+ boolean addingJobs = true;
+ while (addingJobs) {
+ addingJobs = false;
+ for (Iterator<Job> jobIterator : pollResults) {
+ if (jobIterator.hasNext()) {
+ queueCandidates.add(jobIterator.next());
+ addingJobs = true;
+ }
+ }
+ }
+ // The candidate list might be larger than the queue remaining capacity,
+ // but that is okay - the excess jobs will be dequeued and rescheduled.
+ for (Job job : queueCandidates) {
+ try {
+ queueNow(job);
+ } catch (InvalidJobException e) {
+ Debug.logError(e, module);
+ }
+ }
+ }
+ Thread.sleep(pollWaitTime());
+ }
+ } catch (InterruptedException e) {
+ // Happens when JobPoller shuts down - nothing to do.
+ Thread.currentThread().interrupt();
+ }
+ Debug.logInfo("JobPoller thread stopped.", module);
}
}
}