You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/08/23 18:01:13 UTC

svn commit: r1516912 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/

Author: tabish
Date: Fri Aug 23 16:01:12 2013
New Revision: 1516912

URL: http://svn.apache.org/r1516912
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4683

Make scheduler job dispatching start more deterministic

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java Fri Aug 23 16:01:12 2013
@@ -17,117 +17,161 @@
 package org.apache.activemq.broker.scheduler;
 
 import java.util.List;
+
 import org.apache.activemq.util.ByteSequence;
 
 public interface JobScheduler {
 
     /**
      * @return the name of the scheduler
-     * @throws Exception 
+     * @throws Exception
+     */
+    String getName() throws Exception;
+
+    /**
+     * Starts dispatch of scheduled Jobs to registered listeners.
+     *
+     * Any listener added after the start dispatch method can miss jobs so its
+     * important to register critical listeners before the start of job dispatching.
+     *
+     * @throws Exception
+     */
+    void startDispatching() throws Exception;
+
+    /**
+     * Stops dispatching of scheduled Jobs to registered listeners.
+     *
+     * @throws Exception
+     */
+    void stopDispatching() throws Exception;
+
+    /**
+     * Add a Job listener
+     *
+     * @param l
+     * @throws Exception
+     */
+    void addListener(JobListener l) throws Exception;
+
+    /**
+     * remove a JobListener
+     *
+     * @param l
+     * @throws Exception
      */
-    public abstract String getName() throws Exception;
-/**
- * Add a Job listener
- * @param l
- * @throws Exception 
- */
-    public abstract void addListener(JobListener l) throws Exception;
-/**
- * remove a JobListener
- * @param l
- * @throws Exception 
- */
-    public abstract void removeListener(JobListener l) throws Exception;
+    void removeListener(JobListener l) throws Exception;
 
     /**
      * Add a job to be scheduled
-     * @param jobId a unique identifier for the job
-     * @param payload the message to be sent when the job is scheduled
-     * @param delay the time in milliseconds before the job will be run
+     *
+     * @param jobId
+     *            a unique identifier for the job
+     * @param payload
+     *            the message to be sent when the job is scheduled
+     * @param delay
+     *            the time in milliseconds before the job will be run
      * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,long delay) throws Exception;
+    void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
 
     /**
      * Add a job to be scheduled
-     * @param jobId a unique identifier for the job
-     * @param payload the message to be sent when the job is scheduled
-     * @param cronEntry - cron entry
+     *
+     * @param jobId
+     *            a unique identifier for the job
+     * @param payload
+     *            the message to be sent when the job is scheduled
+     * @param cronEntry
+     *            - cron entry
      * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,String cronEntry) throws Exception;
+    void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
 
-    
     /**
      * Add a job to be scheduled
-     * @param jobId a unique identifier for the job
-     * @param payload the message to be sent when the job is scheduled
-     * @param cronEntry - cron entry
-     * @param delay time in ms to wait before scheduling
-     * @param period the time in milliseconds between successive executions of the Job
-     * @param repeat the number of times to execute the job - less than 0 will be repeated forever
+     *
+     * @param jobId
+     *            a unique identifier for the job
+     * @param payload
+     *            the message to be sent when the job is scheduled
+     * @param cronEntry
+     *            - cron entry
+     * @param delay
+     *            time in ms to wait before scheduling
+     * @param period
+     *            the time in milliseconds between successive executions of the Job
+     * @param repeat
+     *            the number of times to execute the job - less than 0 will be repeated forever
      * @throws Exception
      */
-    public abstract void schedule(String jobId, ByteSequence payload,String cronEntry,long delay, long period, int repeat) throws Exception;
+    void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws Exception;
 
     /**
      * remove all jobs scheduled to run at this time
+     *
      * @param time
-     * @throws Exception 
+     * @throws Exception
      */
-    public abstract void remove(long time) throws  Exception;
+    void remove(long time) throws Exception;
 
     /**
      * remove a job with the matching jobId
+     *
      * @param jobId
-     * @throws Exception 
+     * @throws Exception
      */
-    public abstract void remove(String jobId) throws  Exception;
-    
+    void remove(String jobId) throws Exception;
+
     /**
      * remove all the Jobs from the scheduler
+     *
      * @throws Exception
      */
-    public abstract void removeAllJobs() throws Exception;
-    
+    void removeAllJobs() throws Exception;
+
     /**
      * remove all the Jobs from the scheduler that are due between the start and finish times
-     * @param start time in milliseconds
-     * @param finish time in milliseconds
+     *
+     * @param start
+     *            time in milliseconds
+     * @param finish
+     *            time in milliseconds
      * @throws Exception
      */
-    public abstract void removeAllJobs(long start,long finish) throws Exception;
-    
+    void removeAllJobs(long start, long finish) throws Exception;
 
-    
     /**
      * Get the next time jobs will be fired
+     *
      * @return the time in milliseconds
-     * @throws Exception 
+     * @throws Exception
      */
-    public abstract long getNextScheduleTime() throws Exception;
-    
+    long getNextScheduleTime() throws Exception;
+
     /**
      * Get all the jobs scheduled to run next
+     *
      * @return a list of jobs that will be scheduled next
      * @throws Exception
      */
-    public abstract List<Job> getNextScheduleJobs() throws Exception;
-    
+    List<Job> getNextScheduleJobs() throws Exception;
+
     /**
      * Get all the outstanding Jobs
-     * @return a  list of all jobs
-     * @throws Exception 
+     *
+     * @return a list of all jobs
+     * @throws Exception
      */
-    public abstract List<Job> getAllJobs() throws Exception;
-    
+    List<Job> getAllJobs() throws Exception;
+
     /**
      * Get all outstanding jobs due to run between start and finish
+     *
      * @param start
      * @param finish
      * @return a list of jobs
      * @throws Exception
      */
-    public abstract List<Job> getAllJobs(long start,long finish)throws Exception;
+    List<Job> getAllJobs(long start, long finish) throws Exception;
 
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java Fri Aug 23 16:01:12 2013
@@ -18,118 +18,147 @@ package org.apache.activemq.broker.sched
 
 import java.util.Collections;
 import java.util.List;
+
 import org.apache.activemq.util.ByteSequence;
 
 public class JobSchedulerFacade implements JobScheduler {
 
     private final SchedulerBroker broker;
-    
-    JobSchedulerFacade(SchedulerBroker broker){
-        this.broker=broker;
+
+    JobSchedulerFacade(SchedulerBroker broker) {
+        this.broker = broker;
     }
+
+    @Override
     public void addListener(JobListener l) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.addListener(l);
         }
     }
 
+    @Override
     public List<Job> getAllJobs() throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             return js.getAllJobs();
         }
         return Collections.emptyList();
     }
 
+    @Override
     public List<Job> getAllJobs(long start, long finish) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
-            return js.getAllJobs(start,finish);
+        if (js != null) {
+            return js.getAllJobs(start, finish);
         }
         return Collections.emptyList();
     }
 
+    @Override
     public String getName() throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             return js.getName();
         }
         return "";
     }
 
+    @Override
     public List<Job> getNextScheduleJobs() throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             return js.getNextScheduleJobs();
         }
         return Collections.emptyList();
     }
 
+    @Override
     public long getNextScheduleTime() throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             return js.getNextScheduleTime();
         }
         return 0;
     }
 
+    @Override
     public void remove(long time) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.remove(time);
         }
     }
 
+    @Override
     public void remove(String jobId) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.remove(jobId);
         }
-
     }
 
+    @Override
     public void removeAllJobs() throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.removeAllJobs();
         }
     }
 
+    @Override
     public void removeAllJobs(long start, long finish) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
-            js.removeAllJobs(start,finish);
+        if (js != null) {
+            js.removeAllJobs(start, finish);
         }
-
     }
 
+    @Override
     public void removeListener(JobListener l) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.removeListener(l);
         }
-
     }
 
+    @Override
     public void schedule(String jobId, ByteSequence payload, long delay) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.schedule(jobId, payload, delay);
         }
     }
 
-    public void schedule(String jobId, ByteSequence payload,String cronEntry, long start, long period, int repeat) throws Exception {
+    @Override
+    public void schedule(String jobId, ByteSequence payload, String cronEntry, long start, long period, int repeat) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
-            js.schedule(jobId, payload, cronEntry,start,period,repeat);
+        if (js != null) {
+            js.schedule(jobId, payload, cronEntry, start, period, repeat);
         }
     }
+
+    @Override
     public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
         JobScheduler js = this.broker.getInternalScheduler();
-        if (js !=null) {
+        if (js != null) {
             js.schedule(jobId, payload, cronEntry);
         }
-        
+    }
+
+    @Override
+    public void startDispatching() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js != null) {
+            js.startDispatching();
+        }
+    }
+
+    @Override
+    public void stopDispatching() throws Exception {
+        JobScheduler js = this.broker.getInternalScheduler();
+        if (js != null) {
+            js.stopDispatching();
+        }
     }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Fri Aug 23 16:01:12 2013
@@ -16,14 +16,16 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-import org.apache.activemq.Service;
-
 import java.io.File;
 
+import org.apache.activemq.Service;
+
 /**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ * A Job Scheduler Store interface use to manage delay processing of Messaging
+ * related jobs.
  */
 public interface JobSchedulerStore extends Service {
+
     File getDirectory();
 
     void setDirectory(File directory);

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Fri Aug 23 16:01:12 2013
@@ -292,6 +292,7 @@ public class SchedulerBroker extends Bro
             if (this.scheduler == null) {
                 this.scheduler = store.getJobScheduler("JMS");
                 this.scheduler.addListener(this);
+                this.scheduler.startDispatching();
             }
             return this.scheduler;
         }

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?rev=1516912&r1=1516911&r2=1516912&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Fri Aug 23 16:01:12 2013
@@ -51,7 +51,7 @@ class JobSchedulerImpl extends ServiceSu
     private String name;
     BTreeIndex<Long, List<JobLocation>> index;
     private Thread thread;
-    private final Object listenerLock = new Object();
+    private final AtomicBoolean started = new AtomicBoolean(false);
     private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final ScheduleTime scheduleTime = new ScheduleTime();
@@ -82,9 +82,6 @@ class JobSchedulerImpl extends ServiceSu
     @Override
     public void addListener(JobListener l) {
         this.jobListeners.add(l);
-        synchronized (this.listenerLock) {
-            this.listenerLock.notify();
-        }
     }
 
     /*
@@ -480,19 +477,6 @@ class JobSchedulerImpl extends ServiceSu
 
     protected void mainLoop() {
         while (this.running.get()) {
-
-            // Can't start pumping messages until a listener is added otherwise we'd discard messages
-            // without any warning.
-            synchronized (listenerLock) {
-                while (this.running.get() && this.jobListeners.isEmpty()) {
-                    try {
-                        LOG.debug("Scheduled Message dispatch paused while awaiting a Job Listener");
-                        this.listenerLock.wait();
-                    } catch (InterruptedException e) {
-                    }
-                }
-            }
-
             this.scheduleTime.clearNewJob();
             try {
                 // peek the next job
@@ -584,24 +568,39 @@ class JobSchedulerImpl extends ServiceSu
     }
 
     @Override
+    public void startDispatching() throws Exception {
+        if (!this.running.get()) {
+            return;
+        }
+
+        if (started.compareAndSet(false, true)) {
+            this.thread = new Thread(this, "JobScheduler:" + this.name);
+            this.thread.setDaemon(true);
+            this.thread.start();
+        }
+    }
+
+    @Override
+    public void stopDispatching() throws Exception {
+        if (started.compareAndSet(true, false)) {
+            this.scheduleTime.wakeup();
+            Thread t = this.thread;
+            this.thread = null;
+            if (t != null) {
+                t.join(1000);
+            }
+        }
+    }
+
+    @Override
     protected void doStart() throws Exception {
         this.running.set(true);
-        synchronized (this.listenerLock) {
-            this.listenerLock.notify();
-        }
-        this.thread = new Thread(this, "JobScheduler:" + this.name);
-        this.thread.setDaemon(true);
-        this.thread.start();
     }
 
     @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         this.running.set(false);
-        this.scheduleTime.wakeup();
-        Thread t = this.thread;
-        if (t != null) {
-            t.join(1000);
-        }
+        stopDispatching();
     }
 
     long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {