You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@turbine.apache.org by tv...@apache.org on 2022/07/08 07:58:26 UTC
[turbine-core] 01/03: Modernize AbstractSchedulerService
This is an automated email from the ASF dual-hosted git repository.
tv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/turbine-core.git
commit 69ff4fcc20ad0f8134729e727f62259579accbe4
Author: Thomas Vandahl <tv...@apache.org>
AuthorDate: Fri Jul 8 09:51:22 2022 +0200
Modernize AbstractSchedulerService
---
.../schedule/AbstractSchedulerService.java | 110 ++++++++++-----------
1 file changed, 51 insertions(+), 59 deletions(-)
diff --git a/src/java/org/apache/turbine/services/schedule/AbstractSchedulerService.java b/src/java/org/apache/turbine/services/schedule/AbstractSchedulerService.java
index 53276937..4729a3d7 100644
--- a/src/java/org/apache/turbine/services/schedule/AbstractSchedulerService.java
+++ b/src/java/org/apache/turbine/services/schedule/AbstractSchedulerService.java
@@ -20,7 +20,11 @@ package org.apache.turbine.services.schedule;
*/
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.turbine.services.InitializationException;
@@ -43,22 +47,13 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
protected JobQueue<JobEntry> scheduleQueue = null;
/** Current status of the scheduler */
- protected boolean enabled = false;
+ private AtomicBoolean enabled = new AtomicBoolean(false);
- /** The main loop for starting jobs. */
- protected MainLoop mainLoop;
+ /** The housekeeping thread. */
+ protected Thread houseKeepingThread;
- /** The thread used to process commands. */
- protected Thread thread;
-
- /**
- * Creates a new instance.
- */
- public AbstractSchedulerService()
- {
- mainLoop = null;
- thread = null;
- }
+ /** The thread pool used to process jobs. */
+ protected ExecutorService threadPool;
/**
* Initializes the SchedulerService.
@@ -73,7 +68,11 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
{
setEnabled(getConfiguration().getBoolean("enabled", true));
scheduleQueue = new JobQueue<>();
- mainLoop = new MainLoop();
+ threadPool = Executors.newCachedThreadPool(
+ new BasicThreadFactory.Builder()
+ .namingPattern("Turbine-ScheduledJob-")
+ .daemon(true)
+ .build());
@SuppressWarnings("unchecked") // Why is this cast necessary?
List<JobEntry> jobs = (List<JobEntry>)loadJobs();
@@ -108,6 +107,8 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
{
getThread().interrupt();
}
+
+ threadPool.shutdownNow();
}
/**
@@ -183,7 +184,7 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
*/
protected void setEnabled(boolean enabled)
{
- this.enabled = enabled;
+ this.enabled.set(enabled);
}
/**
@@ -194,7 +195,7 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
@Override
public boolean isEnabled()
{
- return enabled;
+ return enabled.get();
}
/**
@@ -219,7 +220,7 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
{
thread.interrupt();
}
- enabled = false;
+ setEnabled(false);
}
/**
@@ -231,7 +232,7 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
*/
public synchronized Thread getThread()
{
- return thread;
+ return houseKeepingThread;
}
/**
@@ -239,7 +240,7 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
*/
protected synchronized void clearThread()
{
- thread = null;
+ houseKeepingThread = null;
}
/**
@@ -250,21 +251,21 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
*/
public synchronized void restart()
{
- if (enabled)
+ if (enabled.get())
{
log.info("Starting job scheduler");
- if (thread == null)
+ if (houseKeepingThread == null)
{
// Create the the housekeeping thread of the scheduler. It will
// wait for the time when the next task needs to be started,
// and then launch a worker thread to execute the task.
- thread = new Thread(mainLoop, ScheduleService.SERVICE_NAME);
+ houseKeepingThread = new Thread(() -> houseKeeping(), ScheduleService.SERVICE_NAME);
// Indicate that this is a system thread. JVM will quit only
// when there are no more enabled user threads. Settings threads
// spawned internally by Turbine as daemons allows commandline
// applications using Turbine to terminate in an orderly manner.
- thread.setDaemon(true);
- thread.start();
+ houseKeepingThread.setDaemon(true);
+ houseKeepingThread.start();
}
else
{
@@ -324,48 +325,39 @@ public abstract class AbstractSchedulerService extends TurbineBaseService implem
}
/**
- * Inner class. This is isolated in its own Runnable class just so that the
- * main class need not implement Runnable, which would allow others to
- * directly invoke run, which is not supported.
+ * Create the the housekeeping thread of the scheduler. It will
+ * wait for the time when the next task needs to be started,
+ * and then launch a worker thread to execute the task.
*/
- protected class MainLoop implements Runnable
+ private void houseKeeping()
{
- /**
- * Method to run the class.
- */
- @Override
- public void run()
+ String taskName = null;
+ try
{
- String taskName = null;
- try
+ while (enabled.get())
{
- while (enabled)
+ JobEntry je = nextJob();
+ if (je != null)
{
- JobEntry je = nextJob();
- if (je != null)
- {
- taskName = je.getTask();
+ taskName = je.getTask();
- // Start the thread to run the job.
- Runnable wt = new WorkerThread(je);
- Thread helper = new Thread(wt);
- helper.start();
- }
- else
- {
- break;
- }
+ // Get a thread to run the job.
+ threadPool.execute(new WorkerThread(je));
+ }
+ else
+ {
+ break;
}
}
- catch (Exception e)
- {
- log.error("Error running a Scheduled Job: {}", taskName, e);
- enabled = false;
- }
- finally
- {
- clearThread();
- }
+ }
+ catch (Exception e)
+ {
+ log.error("Error running a Scheduled Job: {}", taskName, e);
+ setEnabled(false);
+ }
+ finally
+ {
+ clearThread();
}
}
}