You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/08/23 18:01:13 UTC
svn commit: r1516912 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/
Author: tabish
Date: Fri Aug 23 16:01:12 2013
New Revision: 1516912
URL: http://svn.apache.org/r1516912
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4683
Make scheduler job dispatching start more deterministic
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Fri Aug 23 16:01:12 2013
@@ -17,117 +17,161 @@
package org.apache.activemq.broker.scheduler;
import java.util.List;
+
import org.apache.activemq.util.ByteSequence;
public interface JobScheduler {
/**
* @return the name of the scheduler
- * @throws Exception
+ * @throws Exception
+ */
+ String getName() throws Exception;
+
+ /**
+ * Starts dispatch of scheduled Jobs to registered listeners.
+ *
+ * Any listener added after the start dispatch method can miss jobs so its
+ * important to register critical listeners before the start of job dispatching.
+ *
+ * @throws Exception
+ */
+ void startDispatching() throws Exception;
+
+ /**
+ * Stops dispatching of scheduled Jobs to registered listeners.
+ *
+ * @throws Exception
+ */
+ void stopDispatching() throws Exception;
+
+ /**
+ * Add a Job listener
+ *
+ * @param l
+ * @throws Exception
+ */
+ void addListener(JobListener l) throws Exception;
+
+ /**
+ * remove a JobListener
+ *
+ * @param l
+ * @throws Exception
*/
- public abstract String getName() throws Exception;
-/**
- * Add a Job listener
- * @param l
- * @throws Exception
- */
- public abstract void addListener(JobListener l) throws Exception;
-/**
- * remove a JobListener
- * @param l
- * @throws Exception
- */
- public abstract void removeListener(JobListener l) throws Exception;
+ void removeListener(JobListener l) throws Exception;
/**
* Add a job to be scheduled
- * @param jobId a unique identifier for the job
- * @param payload the message to be sent when the job is scheduled
- * @param delay the time in milliseconds before the job will be run
+ *
+ * @param jobId
+ * a unique identifier for the job
+ * @param payload
+ * the message to be sent when the job is scheduled
+ * @param delay
+ * the time in milliseconds before the job will be run
* @throws Exception
*/
- public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception;
+ void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
/**
* Add a job to be scheduled
- * @param jobId a unique identifier for the job
- * @param payload the message to be sent when the job is scheduled
- * @param cronEntry - cron entry
+ *
+ * @param jobId
+ * a unique identifier for the job
+ * @param payload
+ * the message to be sent when the job is scheduled
+ * @param cronEntry
+ * - cron entry
* @throws Exception
*/
- public abstract void schedule(String jobId, ByteSequence payload,String cronEntry) throws Exception;
+ void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
-
/**
* Add a job to be scheduled
- * @param jobId a unique identifier for the job
- * @param payload the message to be sent when the job is scheduled
- * @param cronEntry - cron entry
- * @param delay time in ms to wait before scheduling
- * @param period the time in milliseconds between successive executions of the Job
- * @param repeat the number of times to execute the job - less than 0 will be repeated forever
+ *
+ * @param jobId
+ * a unique identifier for the job
+ * @param payload
+ * the message to be sent when the job is scheduled
+ * @param cronEntry
+ * - cron entry
+ * @param delay
+ * time in ms to wait before scheduling
+ * @param period
+ * the time in milliseconds between successive executions of the Job
+ * @param repeat
+ * the number of times to execute the job - less than 0 will be repeated forever
* @throws Exception
*/
- public abstract void schedule(String jobId, ByteSequence payload,String cronEntry,long delay, long period, int repeat) throws Exception;
+ void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception;
/**
* remove all jobs scheduled to run at this time
+ *
* @param time
- * @throws Exception
+ * @throws Exception
*/
- public abstract void remove(long time) throws Exception;
+ void remove(long time) throws Exception;
/**
* remove a job with the matching jobId
+ *
* @param jobId
- * @throws Exception
+ * @throws Exception
*/
- public abstract void remove(String jobId) throws Exception;
-
+ void remove(String jobId) throws Exception;
+
/**
* remove all the Jobs from the scheduler
+ *
* @throws Exception
*/
- public abstract void removeAllJobs() throws Exception;
-
+ void removeAllJobs() throws Exception;
+
/**
* remove all the Jobs from the scheduler that are due between the start and finish times
- * @param start time in milliseconds
- * @param finish time in milliseconds
+ *
+ * @param start
+ * time in milliseconds
+ * @param finish
+ * time in milliseconds
* @throws Exception
*/
- public abstract void removeAllJobs(long start,long finish) throws Exception;
-
+ void removeAllJobs(long start, long finish) throws Exception;
-
/**
* Get the next time jobs will be fired
+ *
* @return the time in milliseconds
- * @throws Exception
+ * @throws Exception
*/
- public abstract long getNextScheduleTime() throws Exception;
-
+ long getNextScheduleTime() throws Exception;
+
/**
* Get all the jobs scheduled to run next
+ *
* @return a list of jobs that will be scheduled next
* @throws Exception
*/
- public abstract List<Job> getNextScheduleJobs() throws Exception;
-
+ List<Job> getNextScheduleJobs() throws Exception;
+
/**
* Get all the outstanding Jobs
- * @return a list of all jobs
- * @throws Exception
+ *
+ * @return a list of all jobs
+ * @throws Exception
*/
- public abstract List<Job> getAllJobs() throws Exception;
-
+ List<Job> getAllJobs() throws Exception;
+
/**
* Get all outstanding jobs due to run between start and finish
+ *
* @param start
* @param finish
* @return a list of jobs
* @throws Exception
*/
- public abstract List<Job> getAllJobs(long start,long finish)throws Exception;
+ List<Job> getAllJobs(long start, long finish) throws Exception;
}
\ No newline at end of file
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Fri Aug 23 16:01:12 2013
@@ -18,118 +18,147 @@ package org.apache.activemq.broker.sched
import java.util.Collections;
import java.util.List;
+
import org.apache.activemq.util.ByteSequence;
public class JobSchedulerFacade implements JobScheduler {
private final SchedulerBroker broker;
-
- JobSchedulerFacade(SchedulerBroker broker){
- this.broker=broker;
+
+ JobSchedulerFacade(SchedulerBroker broker) {
+ this.broker = broker;
}
+
+ @Override
public void addListener(JobListener l) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.addListener(l);
}
}
+ @Override
public List<Job> getAllJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
return js.getAllJobs();
}
return Collections.emptyList();
}
+ @Override
public List<Job> getAllJobs(long start, long finish) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
- return js.getAllJobs(start,finish);
+ if (js != null) {
+ return js.getAllJobs(start, finish);
}
return Collections.emptyList();
}
+ @Override
public String getName() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
return js.getName();
}
return "";
}
+ @Override
public List<Job> getNextScheduleJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
return js.getNextScheduleJobs();
}
return Collections.emptyList();
}
+ @Override
public long getNextScheduleTime() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
return js.getNextScheduleTime();
}
return 0;
}
+ @Override
public void remove(long time) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.remove(time);
}
}
+ @Override
public void remove(String jobId) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.remove(jobId);
}
-
}
+ @Override
public void removeAllJobs() throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.removeAllJobs();
}
}
+ @Override
public void removeAllJobs(long start, long finish) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
- js.removeAllJobs(start,finish);
+ if (js != null) {
+ js.removeAllJobs(start, finish);
}
-
}
+ @Override
public void removeListener(JobListener l) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.removeListener(l);
}
-
}
+ @Override
public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.schedule(jobId, payload, delay);
}
}
- public void schedule(String jobId, ByteSequence payload,String cronEntry, long start, long period, int repeat) throws Exception {
+ @Override
+ public void schedule(String jobId, ByteSequence payload, String cronEntry, long start, long period, int repeat) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
- js.schedule(jobId, payload, cronEntry,start,period,repeat);
+ if (js != null) {
+ js.schedule(jobId, payload, cronEntry, start, period, repeat);
}
}
+
+ @Override
public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
JobScheduler js = this.broker.getInternalScheduler();
- if (js !=null) {
+ if (js != null) {
js.schedule(jobId, payload, cronEntry);
}
-
+ }
+
+ @Override
+ public void startDispatching() throws Exception {
+ JobScheduler js = this.broker.getInternalScheduler();
+ if (js != null) {
+ js.startDispatching();
+ }
+ }
+
+ @Override
+ public void stopDispatching() throws Exception {
+ JobScheduler js = this.broker.getInternalScheduler();
+ if (js != null) {
+ js.stopDispatching();
+ }
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Fri Aug 23 16:01:12 2013
@@ -16,14 +16,16 @@
*/
package org.apache.activemq.broker.scheduler;
-import org.apache.activemq.Service;
-
import java.io.File;
+import org.apache.activemq.Service;
+
/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * A Job Scheduler Store interface use to manage delay processing of Messaging
+ * related jobs.
*/
public interface JobSchedulerStore extends Service {
+
File getDirectory();
void setDirectory(File directory);
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Aug 23 16:01:12 2013
@@ -292,6 +292,7 @@ public class SchedulerBroker extends Bro
if (this.scheduler == null) {
this.scheduler = store.getJobScheduler("JMS");
this.scheduler.addListener(this);
+ this.scheduler.startDispatching();
}
return this.scheduler;
}
Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Fri Aug 23 16:01:12 2013
@@ -51,7 +51,7 @@ class JobSchedulerImpl extends ServiceSu
private String name;
BTreeIndex<Long, List<JobLocation>> index;
private Thread thread;
- private final Object listenerLock = new Object();
+ private final AtomicBoolean started = new AtomicBoolean(false);
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final ScheduleTime scheduleTime = new ScheduleTime();
@@ -82,9 +82,6 @@ class JobSchedulerImpl extends ServiceSu
@Override
public void addListener(JobListener l) {
this.jobListeners.add(l);
- synchronized (this.listenerLock) {
- this.listenerLock.notify();
- }
}
/*
@@ -480,19 +477,6 @@ class JobSchedulerImpl extends ServiceSu
protected void mainLoop() {
while (this.running.get()) {
-
- // Can't start pumping messages until a listener is added otherwise we'd discard messages
- // without any warning.
- synchronized (listenerLock) {
- while (this.running.get() && this.jobListeners.isEmpty()) {
- try {
- LOG.debug("Scheduled Message dispatch paused while awaiting a Job Listener");
- this.listenerLock.wait();
- } catch (InterruptedException e) {
- }
- }
- }
-
this.scheduleTime.clearNewJob();
try {
// peek the next job
@@ -584,24 +568,39 @@ class JobSchedulerImpl extends ServiceSu
}
@Override
+ public void startDispatching() throws Exception {
+ if (!this.running.get()) {
+ return;
+ }
+
+ if (started.compareAndSet(false, true)) {
+ this.thread = new Thread(this, "JobScheduler:" + this.name);
+ this.thread.setDaemon(true);
+ this.thread.start();
+ }
+ }
+
+ @Override
+ public void stopDispatching() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ this.scheduleTime.wakeup();
+ Thread t = this.thread;
+ this.thread = null;
+ if (t != null) {
+ t.join(1000);
+ }
+ }
+ }
+
+ @Override
protected void doStart() throws Exception {
this.running.set(true);
- synchronized (this.listenerLock) {
- this.listenerLock.notify();
- }
- this.thread = new Thread(this, "JobScheduler:" + this.name);
- this.thread.setDaemon(true);
- this.thread.start();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
this.running.set(false);
- this.scheduleTime.wakeup();
- Thread t = this.thread;
- if (t != null) {
- t.join(1000);
- }
+ stopDispatching();
}
long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {