You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/16 23:10:04 UTC

[16/16] activemq git commit: https://issues.apache.org/jira/browse/AMQ-3758

https://issues.apache.org/jira/browse/AMQ-3758

Refactor the scheduler store into a more KahaDB style store that can
recover from various problems like missing journal files or corruption
as well as rebuild its index when needed.  Move the scheduler store into
a more configurable style that allows for users to plug in their own
implementations.  Store update from legacy versions is automatic.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48

Branch: refs/heads/activemq-5.10.x
Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
Parents: db669e4
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 7 12:28:11 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Tue Dec 16 16:11:09 2014 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |   25 +-
 .../activemq/broker/jmx/JobSchedulerView.java   |   56 +-
 .../broker/jmx/JobSchedulerViewMBean.java       |  113 +-
 .../apache/activemq/broker/scheduler/Job.java   |   23 +-
 .../activemq/broker/scheduler/JobListener.java  |   16 +-
 .../activemq/broker/scheduler/JobScheduler.java |   33 +-
 .../broker/scheduler/JobSchedulerFacade.java    |    6 +
 .../broker/scheduler/JobSchedulerStore.java     |   43 +
 .../activemq/broker/scheduler/JobSupport.java   |    5 +-
 .../activemq/store/PersistenceAdapter.java      |  119 +-
 .../store/memory/MemoryPersistenceAdapter.java  |   36 +-
 .../java/org/apache/activemq/util/IOHelper.java |   68 +-
 .../store/jdbc/JDBCPersistenceAdapter.java      |    7 +
 .../journal/JournalPersistenceAdapter.java      |   71 +-
 .../store/kahadb/AbstractKahaDBMetaData.java    |   57 +
 .../store/kahadb/AbstractKahaDBStore.java       |  745 ++++++++++++
 .../activemq/store/kahadb/KahaDBMetaData.java   |  135 +++
 .../store/kahadb/KahaDBPersistenceAdapter.java  |   15 +-
 .../activemq/store/kahadb/KahaDBStore.java      |   55 +-
 .../kahadb/MultiKahaDBPersistenceAdapter.java   |   56 +-
 .../kahadb/MultiKahaDBTransactionStore.java     |   18 +-
 .../activemq/store/kahadb/TempKahaDBStore.java  |  138 ++-
 .../apache/activemq/store/kahadb/Visitor.java   |   20 +
 .../store/kahadb/scheduler/JobImpl.java         |   21 +-
 .../store/kahadb/scheduler/JobLocation.java     |   77 +-
 .../scheduler/JobLocationsMarshaller.java       |   53 +
 .../kahadb/scheduler/JobSchedulerImpl.java      |  837 ++++++++------
 .../scheduler/JobSchedulerKahaDBMetaData.java   |  246 ++++
 .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
 .../scheduler/UnknownStoreVersionException.java |   24 +
 .../kahadb/scheduler/legacy/LegacyJobImpl.java  |   72 ++
 .../scheduler/legacy/LegacyJobLocation.java     |  296 +++++
 .../legacy/LegacyJobSchedulerImpl.java          |  222 ++++
 .../legacy/LegacyJobSchedulerStoreImpl.java     |  378 ++++++
 .../scheduler/legacy/LegacyStoreReplayer.java   |  155 +++
 .../src/main/proto/journal-data.proto           |   61 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |    5 +
 .../leveldb/replicated/ProxyLevelDBStore.scala  |    5 +
 .../JobSchedulerBrokerShutdownTest.java         |    1 +
 .../JobSchedulerJmxManagementTests.java         |  155 +++
 .../scheduler/JobSchedulerManagementTest.java   |   84 +-
 .../JobSchedulerStoreCheckpointTest.java        |  125 ++
 .../broker/scheduler/JobSchedulerStoreTest.java |   46 +-
 .../broker/scheduler/JobSchedulerTest.java      |   36 +
 .../scheduler/JobSchedulerTestSupport.java      |  112 ++
 .../KahaDBSchedulerIndexRebuildTest.java        |  179 +++
 .../KahaDBSchedulerMissingJournalLogsTest.java  |  204 ++++
 .../scheduler/SchedulerDBVersionTest.java       |  164 +++
 .../src/test/resources/log4j.properties         |    1 +
 .../activemq/store/schedulerDB/legacy/db-1.log  |  Bin 0 -> 524288 bytes
 .../store/schedulerDB/legacy/scheduleDB.data    |  Bin 0 -> 20480 bytes
 .../store/schedulerDB/legacy/scheduleDB.redo    |  Bin 0 -> 16408 bytes
 52 files changed, 5584 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 00d4abd..5becec2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
 
             try {
                 PersistenceAdapter pa = getPersistenceAdapter();
+                if (pa != null) {
+                    this.jobSchedulerStore = pa.createJobSchedulerStore();
+                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+                    configureService(jobSchedulerStore);
+                    jobSchedulerStore.start();
+                    return this.jobSchedulerStore;
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (UnsupportedOperationException ex) {
+                // It's ok if the store doesn't implement a scheduler.
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            try {
+                PersistenceAdapter pa = getPersistenceAdapter();
                 if (pa != null && pa instanceof JobSchedulerStore) {
                     this.jobSchedulerStore = (JobSchedulerStore) pa;
                     configureService(jobSchedulerStore);
@@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
                 throw new RuntimeException(e);
             }
 
+            // Load the KahaDB store as a last resort, this only works if KahaDB is
+            // included at runtime, otherwise this will fail.  User should disable
+            // scheduler support if this fails.
             try {
-                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
-                jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+                jobSchedulerStore = adaptor.createJobSchedulerStore();
                 jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
                 configureService(jobSchedulerStore);
                 jobSchedulerStore.start();

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 9e5a1fb..2118a96 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -16,23 +16,39 @@
  */
 package org.apache.activemq.broker.jmx;
 
+import java.util.List;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSupport;
 
-import javax.management.openmbean.*;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * MBean object that can be used to manage a single instance of a JobScheduler.  The object
+ * provides methods for querying for jobs and removing some or all of the jobs that are
+ * scheduled in the managed store.
+ */
 public class JobSchedulerView implements JobSchedulerViewMBean {
 
     private final JobScheduler jobScheduler;
 
+    /**
+     * Creates a new instance of the JobScheduler management MBean.
+     *
+     * @param jobScheduler
+     *        The scheduler instance to manage.
+     */
     public JobSchedulerView(JobScheduler jobScheduler) {
         this.jobScheduler = jobScheduler;
     }
 
+    @Override
     public TabularData getAllJobs() throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public TabularData getNextScheduleJobs() throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
@@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
         return rc;
     }
 
+    @Override
     public String getNextScheduleTime() throws Exception {
         long time = this.jobScheduler.getNextScheduleTime();
         return JobSupport.getDateTime(time);
     }
 
+    @Override
     public void removeAllJobs() throws Exception {
         this.jobScheduler.removeAllJobs();
-
     }
 
+    @Override
     public void removeAllJobs(String startTime, String finishTime) throws Exception {
         long start = JobSupport.getDataTime(startTime);
         long finish = JobSupport.getDataTime(finishTime);
         this.jobScheduler.removeAllJobs(start, finish);
+    }
 
+    @Override
+    public void removeAllJobsAtScheduledTime(String time) throws Exception {
+        long removeAtTime = JobSupport.getDataTime(time);
+        this.jobScheduler.remove(removeAtTime);
     }
 
+    @Override
+    public void removeJobAtScheduledTime(String time) throws Exception {
+        removeAllJobsAtScheduledTime(time);
+    }
+
+    @Override
     public void removeJob(String jobId) throws Exception {
         this.jobScheduler.remove(jobId);
-
     }
 
-    public void removeJobAtScheduledTime(String time) throws IOException {
-        // TODO Auto-generated method stub
+    @Override
+    public int getExecutionCount(String jobId) throws Exception {
+        int result = 0;
 
-    }
+        List<Job> jobs = this.jobScheduler.getAllJobs();
+        for (Job job : jobs) {
+            if (job.getJobId().equals(jobId)) {
+                result = job.getExecutionCount();
+            }
+        }
 
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index f5745ea..76a7926 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
 
 import javax.management.openmbean.TabularData;
 
-
-
 public interface JobSchedulerViewMBean {
+
     /**
-     * remove all jobs scheduled to run at this time
+     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
+     * at the given time this methods returns without making any modifications to the
+     * scheduler store.
+     *
      * @param time
-     * @throws Exception
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     *
+     * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what
+     *             the method is actually doing.
      */
+    @Deprecated
     @MBeanInfo("remove jobs with matching execution time")
     public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
 
     /**
-     * remove a job with the matching jobId
+     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
+     * at the given time this methods returns without making any modifications to the
+     * scheduler store.
+     *
+     * @param time
+     *        the string formated time that should be used to remove jobs.
+     *
+     * @throws Exception if an error occurs while performing the remove.
+     */
+    @MBeanInfo("remove jobs with matching execution time")
+    public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
+
+    /**
+     * Remove a job with the matching jobId.  If the method does not find a matching job
+     * then it returns without throwing an error or making any modifications to the job
+     * scheduler store.
+     *
      * @param jobId
-     * @throws Exception
+     *        the Job Id to remove from the scheduler store.
+     *
+     * @throws Exception if an error occurs while attempting to remove the Job.
      */
     @MBeanInfo("remove jobs with matching jobId")
     public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
-    
+
     /**
-     * remove all the Jobs from the scheduler
-     * @throws Exception
+     * Remove all the Jobs from the scheduler,
+     *
+     * @throws Exception if an error occurs while purging the store.
      */
     @MBeanInfo("remove all scheduled jobs")
     public abstract void removeAllJobs() throws Exception;
-    
+
     /**
-     * remove all the Jobs from the scheduler that are due between the start and finish times
-     * @param start time 
-     * @param finish time
-     * @throws Exception
+     * Remove all the Jobs from the scheduler that are due between the start and finish times.
+     *
+     * @param start
+     *        the starting time to remove jobs from.
+     * @param finish
+     *        the finish time for the remove operation.
+     *
+     * @throws Exception if an error occurs while attempting to remove the jobs.
      */
     @MBeanInfo("remove all scheduled jobs between time ranges ")
     public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
-    
 
-    
     /**
-     * Get the next time jobs will be fired
-     * @return the time in milliseconds
-     * @throws Exception 
+     * Get the next time jobs will be fired from this scheduler store.
+     *
+     * @return the time in milliseconds of the next job to execute.
+     *
+     * @throws Exception if an error occurs while accessing the store.
      */
     @MBeanInfo("get the next time a job is due to be scheduled ")
     public abstract String getNextScheduleTime() throws Exception;
-    
+
+    /**
+     * Gets the number of times a scheduled Job has been executed.
+     *
+     * @return the total number of time a scheduled job has executed.
+     *
+     * @throws Exception if an error occurs while querying for the Job.
+     */
+    @MBeanInfo("get the next time a job is due to be scheduled ")
+    public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception;
+
     /**
-     * Get all the jobs scheduled to run next
+     * Get all the jobs scheduled to run next.
+     *
      * @return a list of jobs that will be scheduled next
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while reading the scheduler store.
      */
     @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
     public abstract TabularData getNextScheduleJobs() throws Exception;
-    
-    /**
-     * Get all the outstanding Jobs
-     * @return a  table of all jobs
-     * @throws Exception
 
+    /**
+     * Get all the outstanding Jobs that are scheduled in this scheduler store.
+     *
+     * @return a table of all jobs in this scheduler store.
+     *
+     * @throws Exception if an error occurs while reading the store.
      */
     @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
     public abstract TabularData getAllJobs() throws Exception;
-    
+
     /**
-     * Get all outstanding jobs due to run between start and finish
+     * Get all outstanding jobs due to run between start and finish time range.
+     *
      * @param start
+     *        the starting time range to query the store for jobs.
      * @param finish
-     * @return a table of jobs in the range
-     * @throws Exception
-
+     *        the ending time of this query for scheduled jobs.
+     *
+     * @return a table of jobs in the range given.
+     *
+     * @throws Exception if an error occurs while querying the scheduler store.
      */
     @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
     public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 7b28a5b..047fe23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -16,7 +16,12 @@
  */
 package org.apache.activemq.broker.scheduler;
 
-
+/**
+ * Interface for a scheduled Job object.
+ *
+ * Each Job is identified by a unique Job Id which can be used to reference the Job
+ * in the Job Scheduler store for updates or removal.
+ */
 public interface Job {
 
     /**
@@ -38,11 +43,12 @@ public interface Job {
      * @return the Delay
      */
     public abstract long getDelay();
+
     /**
      * @return the period
      */
     public abstract long getPeriod();
-    
+
     /**
      * @return the cron entry
      */
@@ -52,17 +58,24 @@ public interface Job {
      * @return the payload
      */
     public abstract byte[] getPayload();
-    
+
     /**
      * Get the start time as a Date time string
      * @return the date time
      */
     public String getStartTime();
-    
+
     /**
-     * Get the time the job is next due to execute 
+     * Get the time the job is next due to execute
      * @return the date time
      */
     public String getNextExecutionTime();
 
+    /**
+     * Gets the total number of times this job has executed.
+     *
+     * @returns the number of times this job has been executed.
+     */
+    public int getExecutionCount();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
index c53d9c6..a453595 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
@@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
 
 import org.apache.activemq.util.ByteSequence;
 
+/**
+ * Job event listener interface. Provides event points for Job related events
+ * such as job ready events.
+ */
 public interface JobListener {
-    
+
     /**
-     * A Job that has been scheduled is now ready 
-     * @param id
+     * A Job that has been scheduled is now ready to be fired.  The Job is passed
+     * in its raw byte form and must be un-marshaled before being delivered.
+     *
+     * @param jobId
+     *        The unique Job Id of the Job that is ready to fire.
      * @param job
+     *        The job that is now ready, delivered in byte form.
      */
-    public void scheduledJob(String id,ByteSequence job);
+    public void scheduledJob(String id, ByteSequence job);
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
index 2e96eae..e951861 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
@@ -46,20 +46,25 @@ public interface JobScheduler {
     void stopDispatching() throws Exception;
 
     /**
-     * Add a Job listener
+     * Add a Job listener which will receive events related to scheduled jobs.
+     *
+     * @param listener
+     *      The job listener to add.
      *
-     * @param l
      * @throws Exception
      */
-    void addListener(JobListener l) throws Exception;
+    void addListener(JobListener listener) throws Exception;
 
     /**
-     * remove a JobListener
+     * remove a JobListener that was previously registered.  If the given listener is not in
+     * the registry this method has no effect.
+     *
+     * @param listener
+     *      The listener that should be removed from the listener registry.
      *
-     * @param l
      * @throws Exception
      */
-    void removeListener(JobListener l) throws Exception;
+    void removeListener(JobListener listener) throws Exception;
 
     /**
      * Add a job to be scheduled
@@ -70,7 +75,8 @@ public interface JobScheduler {
      *            the message to be sent when the job is scheduled
      * @param delay
      *            the time in milliseconds before the job will be run
-     * @throws Exception
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
      */
     void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
 
@@ -82,8 +88,9 @@ public interface JobScheduler {
      * @param payload
      *            the message to be sent when the job is scheduled
      * @param cronEntry
-     *            - cron entry
-     * @throws Exception
+     *            The cron entry to use to schedule this job.
+     *
+     * @throws Exception if an error occurs while scheduling the Job.
      */
     void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
 
@@ -95,7 +102,7 @@ public interface JobScheduler {
      * @param payload
      *            the message to be sent when the job is scheduled
      * @param cronEntry
-     *            - cron entry
+     *            cron entry
      * @param delay
      *            time in ms to wait before scheduling
      * @param period
@@ -110,6 +117,8 @@ public interface JobScheduler {
      * remove all jobs scheduled to run at this time
      *
      * @param time
+     *      The UTC time to use to remove a batch of scheduled Jobs.
+     *
      * @throws Exception
      */
     void remove(long time) throws Exception;
@@ -118,7 +127,9 @@ public interface JobScheduler {
      * remove a job with the matching jobId
      *
      * @param jobId
-     * @throws Exception
+     *      The unique Job Id to search for and remove from the scheduled set of jobs.
+     *
+     * @throws Exception if an error occurs while removing the Job.
      */
     void remove(String jobId) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
index d46d04a..24a216a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
@@ -21,6 +21,12 @@ import java.util.List;
 
 import org.apache.activemq.util.ByteSequence;
 
+/**
+ * A wrapper for instances of the JobScheduler interface that ensures that methods
+ * provides safe and sane return values and can deal with null values being passed
+ * in etc.  Provides a measure of safety when using unknown implementations of the
+ * JobSchedulerStore which might not always do the right thing.
+ */
 public class JobSchedulerFacade implements JobScheduler {
 
     private final SchedulerBroker broker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
index 3cbc367..c6863c7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
@@ -26,13 +26,56 @@ import org.apache.activemq.Service;
  */
 public interface JobSchedulerStore extends Service {
 
+    /**
+     * Gets the location where the Job Scheduler will write the persistent data used
+     * to preserve and recover scheduled Jobs.
+     *
+     * If the scheduler implementation does not utilize a file system based store this
+     * method returns null.
+     *
+     * @return the directory where persistent store data is written.
+     */
     File getDirectory();
 
+    /**
+     * Sets the directory where persistent store data will be written.  This method
+     * must be called before the scheduler store is started to have any effect.
+     *
+     * @param directory
+     *      The directory where the job scheduler store is to be located.
+     */
     void setDirectory(File directory);
 
+    /**
+     * The size of the current store on disk if the store utilizes a disk based store
+     * mechanism.
+     *
+     * @return the current store size on disk.
+     */
     long size();
 
+    /**
+     * Returns the JobScheduler instance identified by the given name.
+     *
+     * @param name
+     *        the name of the JobScheduler instance to lookup.
+     *
+     * @return the named JobScheduler or null if none exists with the given name.
+     *
+     * @throws Exception if an error occurs while loading the named scheduler.
+     */
     JobScheduler getJobScheduler(String name) throws Exception;
 
+    /**
+     * Removes the named JobScheduler if it exists, purging all scheduled messages
+     * assigned to it.
+     *
+     * @param name
+     *        the name of the scheduler instance to remove.
+     *
+     * @return true if there was a scheduler with the given name to remove.
+     *
+     * @throws Exception if an error occurs while removing the scheduler.
+     */
     boolean removeJobScheduler(String name) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
index 6b78d77..fc5b8dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
@@ -20,7 +20,11 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 
+/**
+ * A class to provide common Job Scheduler related methods.
+ */
 public class JobSupport {
+
     public static String getDateTime(long value) {
         DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         Date date = new Date(value);
@@ -32,5 +36,4 @@ public class JobSupport {
          Date date = dfm.parse(value);
          return date.getTime();
      }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 31efd32..01a9634 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -22,6 +22,7 @@ import java.util.Set;
 
 import org.apache.activemq.Service;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
 /**
  * Adapter to the actual persistence mechanism used with ActiveMQ
  *
- * 
+ *
  */
 public interface PersistenceAdapter extends Service {
 
     /**
-     * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
-     * objects that the persistence store is aware exist.
+     * Returns a set of all the
+     * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
+     * persistence store is aware exist.
      *
      * @return active destinations
      */
     Set<ActiveMQDestination> getDestinations();
 
     /**
-     * Factory method to create a new queue message store with the given destination name
+     * Factory method to create a new queue message store with the given
+     * destination name
+     *
      * @param destination
      * @return the message store
-     * @throws IOException 
+     * @throws IOException
      */
     MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
 
     /**
-     * Factory method to create a new topic message store with the given destination name
-     * @param destination 
+     * Factory method to create a new topic message store with the given
+     * destination name
+     *
+     * @param destination
      * @return the topic message store
-     * @throws IOException 
+     * @throws IOException
      */
     TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
 
     /**
+     * Creates and returns a new Job Scheduler store instance.
+     *
+     * @return a new JobSchedulerStore instance if this Persistence adapter provides its own.
+     *
+     * @throws IOException If an error occurs while creating the new JobSchedulerStore.
+     * @throws UnsupportedOperationException If this adapter does not provide its own
+     *                                       scheduler store implementation.
+     */
+    JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException;
+
+    /**
      * Cleanup method to remove any state associated with the given destination.
      * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
      */
     void removeQueueMessageStore(ActiveMQQueue destination);
 
     /**
      * Cleanup method to remove any state associated with the given destination
      * This method does not stop the message store (it might not be cached).
-     * @param destination Destination to forget
+     *
+     * @param destination
+     *            Destination to forget
      */
     void removeTopicMessageStore(ActiveMQTopic destination);
 
     /**
-     * Factory method to create a new persistent prepared transaction store for XA recovery
+     * Factory method to create a new persistent prepared transaction store for
+     * XA recovery
+     *
      * @return transaction store
-     * @throws IOException 
+     * @throws IOException
      */
     TransactionStore createTransactionStore() throws IOException;
 
     /**
-     * This method starts a transaction on the persistent storage - which is nothing to
-     * do with JMS or XA transactions - its purely a mechanism to perform multiple writes
-     * to a persistent store in 1 transaction as a performance optimization.
+     * This method starts a transaction on the persistent storage - which is
+     * nothing to do with JMS or XA transactions - its purely a mechanism to
+     * perform multiple writes to a persistent store in 1 transaction as a
+     * performance optimization.
      * <p/>
-     * Typically one transaction will require one disk synchronization point and so for
-     * real high performance its usually faster to perform many writes within the same
-     * transaction to minimize latency caused by disk synchronization. This is especially
-     * true when using tools like Berkeley Db or embedded JDBC servers.
-     * @param context 
-     * @throws IOException 
+     * Typically one transaction will require one disk synchronization point and
+     * so for real high performance its usually faster to perform many writes
+     * within the same transaction to minimize latency caused by disk
+     * synchronization. This is especially true when using tools like Berkeley
+     * Db or embedded JDBC servers.
+     *
+     * @param context
+     * @throws IOException
      */
     void beginTransaction(ConnectionContext context) throws IOException;
 
-
     /**
      * Commit a persistence transaction
-     * @param context 
-     * @throws IOException 
+     *
+     * @param context
+     * @throws IOException
      *
      * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
@@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
 
     /**
      * Rollback a persistence transaction
-     * @param context 
-     * @throws IOException 
+     *
+     * @param context
+     * @throws IOException
      *
      * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
      */
     void rollbackTransaction(ConnectionContext context) throws IOException;
-    
+
     /**
-     * 
+     *
      * @return last broker sequence
      * @throws IOException
      */
     long getLastMessageBrokerSequenceId() throws IOException;
-    
+
     /**
      * Delete's all the messages in the persistent store.
-     * 
+     *
      * @throws IOException
      */
     void deleteAllMessages() throws IOException;
-        
+
     /**
-     * @param usageManager The UsageManager that is controlling the broker's memory usage.
+     * @param usageManager
+     *            The UsageManager that is controlling the broker's memory
+     *            usage.
      */
     void setUsageManager(SystemUsage usageManager);
-    
+
     /**
      * Set the name of the broker using the adapter
+     *
      * @param brokerName
      */
     void setBrokerName(String brokerName);
-    
+
     /**
      * Set the directory where any data files should be created
+     *
      * @param dir
      */
     void setDirectory(File dir);
@@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
      * @return the directory used by the persistence adaptor
      */
     File getDirectory();
-    
+
     /**
      * checkpoint any
-     * @param sync 
-     * @throws IOException 
+     *
+     * @param sync
+     * @throws IOException
      *
      */
     void checkpoint(boolean sync) throws IOException;
-    
+
     /**
      * A hint to return the size of the store on disk
+     *
      * @return disk space used in bytes of 0 if not implemented
      */
     long size();
 
     /**
-     * return the last stored producer sequenceId for this producer Id
-     * used to suppress duplicate sends on failover reconnect at the transport
-     * when a reconnect occurs
-     * @param id the producerId to find a sequenceId for
+     * return the last stored producer sequenceId for this producer Id used to
+     * suppress duplicate sends on failover reconnect at the transport when a
+     * reconnect occurs
+     *
+     * @param id
+     *            the producerId to find a sequenceId for
      * @return the last stored sequence id or -1 if no suppression needed
      */
     long getLastProducerSequenceId(ProducerId id) throws IOException;

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 0fd6bfc..73ea104 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class MemoryPersistenceAdapter implements PersistenceAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
     ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
     private boolean useExternalMessageReferences;
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
         for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
@@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return new MemoryPersistenceAdapter();
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         MessageStore rc = queues.get(destination);
         if (rc == null) {
@@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return rc;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
         TopicMessageStore rc = topics.get(destination);
         if (rc == null) {
@@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         queues.remove(destination);
     }
@@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         if (transactionStore == null) {
             transactionStore = new MemoryTransactionStore(this);
@@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
         return transactionStore;
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) {
     }
 
+    @Override
     public void start() throws Exception {
     }
 
+    @Override
     public void stop() throws Exception {
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return 0;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
             MemoryMessageStore store = asMemoryMessageStore(iter.next());
@@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
      * @param usageManager The UsageManager that is controlling the broker's
      *                memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
     }
 
+    @Override
     public String toString() {
         return "MemoryPersistenceAdapter";
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
     }
 
+    @Override
     public void setDirectory(File dir) {
     }
-    
+
+    @Override
     public File getDirectory(){
         return null;
     }
 
+    @Override
     public void checkpoint(boolean sync) throws IOException {
     }
-    
+
+    @Override
     public long size(){
         return 0;
     }
-    
+
     public void setCreateTransactionStore(boolean create) throws IOException {
         if (create) {
             createTransactionStore();
         }
     }
 
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         // memory map does duplicate suppression
         return -1;
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        // We could eventuall implement an in memory scheduler.
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
index a623de9..2a70194 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -61,8 +61,9 @@ public final class IOHelper {
     }
 
     /**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     * Converts any string into a string that is safe to use as a file name. The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
      *
      * @param name
      * @return
@@ -76,15 +77,16 @@ public final class IOHelper {
     }
 
     /**
-     * Converts any string into a string that is safe to use as a file name.
-     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+     * Converts any string into a string that is safe to use as a file name. The
+     * result will only include ascii characters and numbers, and the "-","_",
+     * and "." characters.
      *
      * @param name
      * @param dirSeparators
      * @param maxFileLength
      * @return
      */
-    public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
+    public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
         int size = name.length();
         StringBuffer rc = new StringBuffer(size * 2);
         for (int i = 0; i < size; i++) {
@@ -92,8 +94,7 @@ public final class IOHelper {
             boolean valid = c >= 'a' && c <= 'z';
             valid = valid || (c >= 'A' && c <= 'Z');
             valid = valid || (c >= '0' && c <= '9');
-            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
-                    ||(dirSeparators && ( (c == '/') || (c == '\\')));
+            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\')));
 
             if (valid) {
                 rc.append(c);
@@ -105,7 +106,7 @@ public final class IOHelper {
         }
         String result = rc.toString();
         if (result.length() > maxFileLength) {
-            result = result.substring(result.length()-maxFileLength,result.length());
+            result = result.substring(result.length() - maxFileLength, result.length());
         }
         return result;
     }
@@ -168,8 +169,7 @@ public final class IOHelper {
             } else {
                 for (int i = 0; i < files.length; i++) {
                     File file = files[i];
-                    if (file.getName().equals(".")
-                            || file.getName().equals("..")) {
+                    if (file.getName().equals(".") || file.getName().equals("..")) {
                         continue;
                     }
                     if (file.isDirectory()) {
@@ -190,6 +190,27 @@ public final class IOHelper {
         }
     }
 
+    public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
+        if (!srcDirectory.isDirectory()) {
+            throw new IOException("source is not a directory");
+        }
+
+        if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
+            throw new IOException("target exists and is not a directory");
+        } else {
+            mkdirs(targetDirectory);
+        }
+
+        List<File> filesToMove = new ArrayList<File>();
+        getFiles(srcDirectory, filesToMove, filter);
+
+        for (File file : filesToMove) {
+            if (!file.isDirectory()) {
+                moveFile(file, targetDirectory);
+            }
+        }
+    }
+
     public static void copyFile(File src, File dest) throws IOException {
         copyFile(src, dest, null);
     }
@@ -222,32 +243,32 @@ public final class IOHelper {
         File parent = src.getParentFile();
         String fromPath = from.getAbsolutePath();
         if (parent.getAbsolutePath().equals(fromPath)) {
-            //one level down
+            // one level down
             result = to;
-        }else {
+        } else {
             String parentPath = parent.getAbsolutePath();
             String path = parentPath.substring(fromPath.length());
-            result = new File(to.getAbsolutePath()+File.separator+path);
+            result = new File(to.getAbsolutePath() + File.separator + path);
         }
         return result;
     }
 
-    static List<File> getFiles(File dir,FilenameFilter filter){
+    static List<File> getFiles(File dir, FilenameFilter filter) {
         List<File> result = new ArrayList<File>();
-        getFiles(dir,result,filter);
+        getFiles(dir, result, filter);
         return result;
     }
 
-    static void getFiles(File dir,List<File> list,FilenameFilter filter) {
+    static void getFiles(File dir, List<File> list, FilenameFilter filter) {
         if (!list.contains(dir)) {
             list.add(dir);
-            String[] fileNames=dir.list(filter);
-            for (int i =0; i < fileNames.length;i++) {
-                File f = new File(dir,fileNames[i]);
+            String[] fileNames = dir.list(filter);
+            for (int i = 0; i < fileNames.length; i++) {
+                File f = new File(dir, fileNames[i]);
                 if (f.isFile()) {
                     list.add(f);
-                }else {
-                    getFiles(dir,list,filter);
+                } else {
+                    getFiles(dir, list, filter);
                 }
             }
         }
@@ -286,12 +307,13 @@ public final class IOHelper {
     public static void mkdirs(File dir) throws IOException {
         if (dir.exists()) {
             if (!dir.isDirectory()) {
-                throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
+                throw new IOException("Failed to create directory '" + dir +
+                                      "', regular file already existed with that name");
             }
 
         } else {
             if (!dir.mkdirs()) {
-                throw new IOException("Failed to create directory '" + dir+"'");
+                throw new IOException("Failed to create directory '" + dir + "'");
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 7ff4ae0..a3a8250 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         this.lockDataSource = dataSource;
     }
 
+    @Override
     public BrokerService getBrokerService() {
         return brokerService;
     }
@@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
         }
         return result;
     }
+
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 565fc9f..cc5282f 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.activeio.journal.InvalidRecordLocationException;
 import org.apache.activeio.journal.Journal;
 import org.apache.activeio.journal.JournalEventListener;
@@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
  * An implementation of {@link PersistenceAdapter} designed for use with a
  * {@link Journal} and then check pointing asynchronously on a timeout with some
  * other long term persistent storage.
- * 
+ *
  * @org.apache.xbean.XBean
- * 
+ *
  */
 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
 
     private BrokerService brokerService;
-	
+
     protected Scheduler scheduler;
     private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
 
@@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     private TaskRunnerFactory taskRunnerFactory;
     private File directory;
 
-    public JournalPersistenceAdapter() {        
+    public JournalPersistenceAdapter() {
     }
-    
+
     public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
         setJournal(journal);
         setTaskRunnerFactory(taskRunnerFactory);
@@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         this.journal = journal;
         journal.setJournalEventListener(this);
     }
-    
+
     public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
         this.longTermPersistence = longTermPersistence;
     }
-    
+
     final Runnable createPeriodicCheckpointTask() {
         return new Runnable() {
+            @Override
             public void run() {
                 long lastTime = 0;
                 synchronized (this) {
@@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      * @param usageManager The UsageManager that is controlling the
      *                destination's memory usage.
      */
+    @Override
     public void setUsageManager(SystemUsage usageManager) {
         this.usageManager = usageManager;
         longTermPersistence.setUsageManager(usageManager);
     }
 
+    @Override
     public Set<ActiveMQDestination> getDestinations() {
         Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
         destinations.addAll(queues.keySet());
@@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
         JournalMessageStore store = queues.get(destination);
         if (store == null) {
@@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return store;
     }
 
+    @Override
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
         JournalTopicMessageStore store = topics.get(destinationName);
         if (store == null) {
@@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeQueueMessageStore(ActiveMQQueue destination) {
         queues.remove(destination);
     }
@@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
      *
      * @param destination Destination to forget
      */
+    @Override
     public void removeTopicMessageStore(ActiveMQTopic destination) {
         topics.remove(destination);
     }
 
+    @Override
     public TransactionStore createTransactionStore() throws IOException {
         return transactionStore;
     }
 
+    @Override
     public long getLastMessageBrokerSequenceId() throws IOException {
         return longTermPersistence.getLastMessageBrokerSequenceId();
     }
 
+    @Override
     public void beginTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.beginTransaction(context);
     }
 
+    @Override
     public void commitTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.commitTransaction(context);
     }
 
+    @Override
     public void rollbackTransaction(ConnectionContext context) throws IOException {
         longTermPersistence.rollbackTransaction(context);
     }
 
+    @Override
     public synchronized void start() throws Exception {
         if (!started.compareAndSet(false, true)) {
             return;
@@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
 
         checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
+            @Override
             public boolean iterate() {
                 return doCheckpoint();
             }
         }, "ActiveMQ Journal Checkpoint Worker");
 
         checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+            @Override
             public Thread newThread(Runnable runable) {
                 Thread t = new Thread(runable, "Journal checkpoint worker");
                 t.setPriority(7);
@@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
 
     }
 
+    @Override
     public void stop() throws Exception {
 
         this.usageManager.getMemoryUsage().removeUsageListener(this);
@@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     /**
      * The Journal give us a call back so that we can move old data out of the
      * journal. Taking a checkpoint does this for us.
-     * 
+     *
      * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
      */
+    @Override
     public void overflowNotification(RecordLocation safeLocation) {
         checkpoint(false, true);
     }
 
     /**
      * When we checkpoint we move all the journalled data to long term storage.
-     * 
+     *
      */
     public void checkpoint(boolean sync, boolean fullCheckpoint) {
         try {
@@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public void checkpoint(boolean sync) {
         checkpoint(sync, sync);
     }
 
     /**
      * This does the actual checkpoint.
-     * 
+     *
      * @return
      */
     public boolean doCheckpoint() {
@@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
             // We do many partial checkpoints (fullCheckpoint==false) to move
             // topic messages
             // to long term store as soon as possible.
-            // 
+            //
             // We want to avoid doing that for queue messages since removes the
             // come in the same
             // checkpoint cycle will nullify the previous message add.
@@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
                     try {
                         final JournalMessageStore ms = iterator.next();
                         FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                            @Override
                             public RecordLocation call() throws Exception {
                                 return ms.checkpoint();
                             }
@@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
                 try {
                     final JournalTopicMessageStore ms = iterator.next();
                     FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+                        @Override
                         public RecordLocation call() throws Exception {
                             return ms.checkpoint();
                         }
@@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     /**
      * Move all the messages that were in the journal into long term storage. We
      * just replay and do a checkpoint.
-     * 
+     *
      * @throws IOException
      * @throws IOException
      * @throws InvalidRecordLocationException
@@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
     public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
         if (started.get()) {
             try {
-        	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
+                return journal.write(toPacket(wireFormat.marshal(command)), sync);
             } catch (IOException ioe) {
-        	    LOG.error("Cannot write to the journal", ioe);
-        	    brokerService.handleIOException(ioe);
-        	    throw ioe;
+                LOG.error("Cannot write to the journal", ioe);
+                brokerService.handleIOException(ioe);
+                throw ioe;
             }
         }
         throw new IOException("closed");
@@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return writeCommand(trace, sync);
     }
 
+    @Override
     public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
         newPercentUsage = (newPercentUsage / 10) * 10;
         oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return transactionStore;
     }
 
+    @Override
     public void deleteAllMessages() throws IOException {
         try {
             JournalTrace trace = new JournalTrace();
@@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
     }
 
+    @Override
     public void setBrokerName(String brokerName) {
         longTermPersistence.setBrokerName(brokerName);
     }
@@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         return "JournalPersistenceAdapter(" + longTermPersistence + ")";
     }
 
+    @Override
     public void setDirectory(File dir) {
         this.directory=dir;
     }
-    
+
+    @Override
     public File getDirectory(){
         return directory;
     }
-    
+
+    @Override
     public long size(){
         return 0;
     }
 
+    @Override
     public void setBrokerService(BrokerService brokerService) {
         this.brokerService = brokerService;
         PersistenceAdapter pa = getLongTermPersistence();
@@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
         }
     }
 
+    @Override
     public long getLastProducerSequenceId(ProducerId id) {
         return -1;
     }
 
+    @Override
+    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+        return longTermPersistence.createJobSchedulerStore();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
new file mode 100644
index 0000000..edb2750
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+
+public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
+
+    private int state;
+    private Location lastUpdateLocation;
+    private Page<T> page;
+
+    @Override
+    public Page<T> getPage() {
+        return page;
+    }
+
+    @Override
+    public int getState() {
+        return state;
+    }
+
+    @Override
+    public Location getLastUpdateLocation() {
+        return lastUpdateLocation;
+    }
+
+    @Override
+    public void setPage(Page<T> page) {
+        this.page = page;
+    }
+
+    @Override
+    public void setState(int value) {
+        this.state = value;
+    }
+
+    @Override
+    public void setLastUpdateLocation(Location location) {
+        this.lastUpdateLocation = location;
+    }
+}


Re: [16/16] activemq git commit: https://issues.apache.org/jira/browse/AMQ-3758

Posted by Timothy Bish <ta...@gmail.com>.
-1

This is a huge change that completely alters the Job scheduler store to 
use a totally new store format and while the update does allow for the 
update from an older to a newer store this not something I would expect 
to see in a point release.

On 12/16/2014 05:10 PM, hadrian@apache.org wrote:
> https://issues.apache.org/jira/browse/AMQ-3758
>
> Refactor the scheduler store into a more KahaDB style store that can
> recover from various problems like missing journal files or corruption
> as well as rebuild its index when needed.  Move the scheduler store into
> a more configurable style that allows for users to plug in their own
> implementations.  Store update from legacy versions is automatic.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48
>
> Branch: refs/heads/activemq-5.10.x
> Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
> Parents: db669e4
> Author: Timothy Bish <ta...@gmail.com>
> Authored: Mon Jul 7 12:28:11 2014 -0400
> Committer: Hadrian Zbarcea <ha...@apache.org>
> Committed: Tue Dec 16 16:11:09 2014 -0500
>
> ----------------------------------------------------------------------
>   .../apache/activemq/broker/BrokerService.java   |   25 +-
>   .../activemq/broker/jmx/JobSchedulerView.java   |   56 +-
>   .../broker/jmx/JobSchedulerViewMBean.java       |  113 +-
>   .../apache/activemq/broker/scheduler/Job.java   |   23 +-
>   .../activemq/broker/scheduler/JobListener.java  |   16 +-
>   .../activemq/broker/scheduler/JobScheduler.java |   33 +-
>   .../broker/scheduler/JobSchedulerFacade.java    |    6 +
>   .../broker/scheduler/JobSchedulerStore.java     |   43 +
>   .../activemq/broker/scheduler/JobSupport.java   |    5 +-
>   .../activemq/store/PersistenceAdapter.java      |  119 +-
>   .../store/memory/MemoryPersistenceAdapter.java  |   36 +-
>   .../java/org/apache/activemq/util/IOHelper.java |   68 +-
>   .../store/jdbc/JDBCPersistenceAdapter.java      |    7 +
>   .../journal/JournalPersistenceAdapter.java      |   71 +-
>   .../store/kahadb/AbstractKahaDBMetaData.java    |   57 +
>   .../store/kahadb/AbstractKahaDBStore.java       |  745 ++++++++++++
>   .../activemq/store/kahadb/KahaDBMetaData.java   |  135 +++
>   .../store/kahadb/KahaDBPersistenceAdapter.java  |   15 +-
>   .../activemq/store/kahadb/KahaDBStore.java      |   55 +-
>   .../kahadb/MultiKahaDBPersistenceAdapter.java   |   56 +-
>   .../kahadb/MultiKahaDBTransactionStore.java     |   18 +-
>   .../activemq/store/kahadb/TempKahaDBStore.java  |  138 ++-
>   .../apache/activemq/store/kahadb/Visitor.java   |   20 +
>   .../store/kahadb/scheduler/JobImpl.java         |   21 +-
>   .../store/kahadb/scheduler/JobLocation.java     |   77 +-
>   .../scheduler/JobLocationsMarshaller.java       |   53 +
>   .../kahadb/scheduler/JobSchedulerImpl.java      |  837 ++++++++------
>   .../scheduler/JobSchedulerKahaDBMetaData.java   |  246 ++++
>   .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
>   .../scheduler/UnknownStoreVersionException.java |   24 +
>   .../kahadb/scheduler/legacy/LegacyJobImpl.java  |   72 ++
>   .../scheduler/legacy/LegacyJobLocation.java     |  296 +++++
>   .../legacy/LegacyJobSchedulerImpl.java          |  222 ++++
>   .../legacy/LegacyJobSchedulerStoreImpl.java     |  378 ++++++
>   .../scheduler/legacy/LegacyStoreReplayer.java   |  155 +++
>   .../src/main/proto/journal-data.proto           |   61 +
>   .../apache/activemq/leveldb/LevelDBStore.scala  |    5 +
>   .../leveldb/replicated/ProxyLevelDBStore.scala  |    5 +
>   .../JobSchedulerBrokerShutdownTest.java         |    1 +
>   .../JobSchedulerJmxManagementTests.java         |  155 +++
>   .../scheduler/JobSchedulerManagementTest.java   |   84 +-
>   .../JobSchedulerStoreCheckpointTest.java        |  125 ++
>   .../broker/scheduler/JobSchedulerStoreTest.java |   46 +-
>   .../broker/scheduler/JobSchedulerTest.java      |   36 +
>   .../scheduler/JobSchedulerTestSupport.java      |  112 ++
>   .../KahaDBSchedulerIndexRebuildTest.java        |  179 +++
>   .../KahaDBSchedulerMissingJournalLogsTest.java  |  204 ++++
>   .../scheduler/SchedulerDBVersionTest.java       |  164 +++
>   .../src/test/resources/log4j.properties         |    1 +
>   .../activemq/store/schedulerDB/legacy/db-1.log  |  Bin 0 -> 524288 bytes
>   .../store/schedulerDB/legacy/scheduleDB.data    |  Bin 0 -> 20480 bytes
>   .../store/schedulerDB/legacy/scheduleDB.redo    |  Bin 0 -> 16408 bytes
>   52 files changed, 5584 insertions(+), 911 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> index 00d4abd..5becec2 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> @@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
>   
>               try {
>                   PersistenceAdapter pa = getPersistenceAdapter();
> +                if (pa != null) {
> +                    this.jobSchedulerStore = pa.createJobSchedulerStore();
> +                    jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
> +                    configureService(jobSchedulerStore);
> +                    jobSchedulerStore.start();
> +                    return this.jobSchedulerStore;
> +                }
> +            } catch (IOException e) {
> +                throw new RuntimeException(e);
> +            } catch (UnsupportedOperationException ex) {
> +                // It's ok if the store doesn't implement a scheduler.
> +            } catch (Exception e) {
> +                throw new RuntimeException(e);
> +            }
> +
> +            try {
> +                PersistenceAdapter pa = getPersistenceAdapter();
>                   if (pa != null && pa instanceof JobSchedulerStore) {
>                       this.jobSchedulerStore = (JobSchedulerStore) pa;
>                       configureService(jobSchedulerStore);
> @@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
>                   throw new RuntimeException(e);
>               }
>   
> +            // Load the KahaDB store as a last resort, this only works if KahaDB is
> +            // included at runtime, otherwise this will fail.  User should disable
> +            // scheduler support if this fails.
>               try {
> -                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
> -                jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
> +                String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
> +                PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
> +                jobSchedulerStore = adaptor.createJobSchedulerStore();
>                   jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
>                   configureService(jobSchedulerStore);
>                   jobSchedulerStore.start();
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> index 9e5a1fb..2118a96 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> @@ -16,23 +16,39 @@
>    */
>   package org.apache.activemq.broker.jmx;
>   
> +import java.util.List;
> +
> +import javax.management.openmbean.CompositeDataSupport;
> +import javax.management.openmbean.CompositeType;
> +import javax.management.openmbean.TabularData;
> +import javax.management.openmbean.TabularDataSupport;
> +import javax.management.openmbean.TabularType;
> +
>   import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
>   import org.apache.activemq.broker.scheduler.Job;
>   import org.apache.activemq.broker.scheduler.JobScheduler;
>   import org.apache.activemq.broker.scheduler.JobSupport;
>   
> -import javax.management.openmbean.*;
> -import java.io.IOException;
> -import java.util.List;
> -
> +/**
> + * MBean object that can be used to manage a single instance of a JobScheduler.  The object
> + * provides methods for querying for jobs and removing some or all of the jobs that are
> + * scheduled in the managed store.
> + */
>   public class JobSchedulerView implements JobSchedulerViewMBean {
>   
>       private final JobScheduler jobScheduler;
>   
> +    /**
> +     * Creates a new instance of the JobScheduler management MBean.
> +     *
> +     * @param jobScheduler
> +     *        The scheduler instance to manage.
> +     */
>       public JobSchedulerView(JobScheduler jobScheduler) {
>           this.jobScheduler = jobScheduler;
>       }
>   
> +    @Override
>       public TabularData getAllJobs() throws Exception {
>           OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
>           CompositeType ct = factory.getCompositeType();
> @@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
>           return rc;
>       }
>   
> +    @Override
>       public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
>           OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
>           CompositeType ct = factory.getCompositeType();
> @@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
>           return rc;
>       }
>   
> +    @Override
>       public TabularData getNextScheduleJobs() throws Exception {
>           OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
>           CompositeType ct = factory.getCompositeType();
> @@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
>           return rc;
>       }
>   
> +    @Override
>       public String getNextScheduleTime() throws Exception {
>           long time = this.jobScheduler.getNextScheduleTime();
>           return JobSupport.getDateTime(time);
>       }
>   
> +    @Override
>       public void removeAllJobs() throws Exception {
>           this.jobScheduler.removeAllJobs();
> -
>       }
>   
> +    @Override
>       public void removeAllJobs(String startTime, String finishTime) throws Exception {
>           long start = JobSupport.getDataTime(startTime);
>           long finish = JobSupport.getDataTime(finishTime);
>           this.jobScheduler.removeAllJobs(start, finish);
> +    }
>   
> +    @Override
> +    public void removeAllJobsAtScheduledTime(String time) throws Exception {
> +        long removeAtTime = JobSupport.getDataTime(time);
> +        this.jobScheduler.remove(removeAtTime);
>       }
>   
> +    @Override
> +    public void removeJobAtScheduledTime(String time) throws Exception {
> +        removeAllJobsAtScheduledTime(time);
> +    }
> +
> +    @Override
>       public void removeJob(String jobId) throws Exception {
>           this.jobScheduler.remove(jobId);
> -
>       }
>   
> -    public void removeJobAtScheduledTime(String time) throws IOException {
> -        // TODO Auto-generated method stub
> +    @Override
> +    public int getExecutionCount(String jobId) throws Exception {
> +        int result = 0;
>   
> -    }
> +        List<Job> jobs = this.jobScheduler.getAllJobs();
> +        for (Job job : jobs) {
> +            if (job.getJobId().equals(jobId)) {
> +                result = job.getExecutionCount();
> +            }
> +        }
>   
> +        return result;
> +    }
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> index f5745ea..76a7926 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> @@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
>   
>   import javax.management.openmbean.TabularData;
>   
> -
> -
>   public interface JobSchedulerViewMBean {
> +
>       /**
> -     * remove all jobs scheduled to run at this time
> +     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
> +     * at the given time this methods returns without making any modifications to the
> +     * scheduler store.
> +     *
>        * @param time
> -     * @throws Exception
> +     *        the string formated time that should be used to remove jobs.
> +     *
> +     * @throws Exception if an error occurs while performing the remove.
> +     *
> +     * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what
> +     *             the method is actually doing.
>        */
> +    @Deprecated
>       @MBeanInfo("remove jobs with matching execution time")
>       public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
>   
>       /**
> -     * remove a job with the matching jobId
> +     * Remove all jobs scheduled to run at this time.  If there are no jobs scheduled
> +     * at the given time this methods returns without making any modifications to the
> +     * scheduler store.
> +     *
> +     * @param time
> +     *        the string formated time that should be used to remove jobs.
> +     *
> +     * @throws Exception if an error occurs while performing the remove.
> +     */
> +    @MBeanInfo("remove jobs with matching execution time")
> +    public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
> +
> +    /**
> +     * Remove a job with the matching jobId.  If the method does not find a matching job
> +     * then it returns without throwing an error or making any modifications to the job
> +     * scheduler store.
> +     *
>        * @param jobId
> -     * @throws Exception
> +     *        the Job Id to remove from the scheduler store.
> +     *
> +     * @throws Exception if an error occurs while attempting to remove the Job.
>        */
>       @MBeanInfo("remove jobs with matching jobId")
>       public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
> -
> +
>       /**
> -     * remove all the Jobs from the scheduler
> -     * @throws Exception
> +     * Remove all the Jobs from the scheduler,
> +     *
> +     * @throws Exception if an error occurs while purging the store.
>        */
>       @MBeanInfo("remove all scheduled jobs")
>       public abstract void removeAllJobs() throws Exception;
> -
> +
>       /**
> -     * remove all the Jobs from the scheduler that are due between the start and finish times
> -     * @param start time
> -     * @param finish time
> -     * @throws Exception
> +     * Remove all the Jobs from the scheduler that are due between the start and finish times.
> +     *
> +     * @param start
> +     *        the starting time to remove jobs from.
> +     * @param finish
> +     *        the finish time for the remove operation.
> +     *
> +     * @throws Exception if an error occurs while attempting to remove the jobs.
>        */
>       @MBeanInfo("remove all scheduled jobs between time ranges ")
>       public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
> -
>   
> -
>       /**
> -     * Get the next time jobs will be fired
> -     * @return the time in milliseconds
> -     * @throws Exception
> +     * Get the next time jobs will be fired from this scheduler store.
> +     *
> +     * @return the time in milliseconds of the next job to execute.
> +     *
> +     * @throws Exception if an error occurs while accessing the store.
>        */
>       @MBeanInfo("get the next time a job is due to be scheduled ")
>       public abstract String getNextScheduleTime() throws Exception;
> -
> +
> +    /**
> +     * Gets the number of times a scheduled Job has been executed.
> +     *
> +     * @return the total number of time a scheduled job has executed.
> +     *
> +     * @throws Exception if an error occurs while querying for the Job.
> +     */
> +    @MBeanInfo("get the next time a job is due to be scheduled ")
> +    public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception;
> +
>       /**
> -     * Get all the jobs scheduled to run next
> +     * Get all the jobs scheduled to run next.
> +     *
>        * @return a list of jobs that will be scheduled next
> -     * @throws Exception
> +     *
> +     * @throws Exception if an error occurs while reading the scheduler store.
>        */
>       @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
>       public abstract TabularData getNextScheduleJobs() throws Exception;
> -
> -    /**
> -     * Get all the outstanding Jobs
> -     * @return a  table of all jobs
> -     * @throws Exception
>   
> +    /**
> +     * Get all the outstanding Jobs that are scheduled in this scheduler store.
> +     *
> +     * @return a table of all jobs in this scheduler store.
> +     *
> +     * @throws Exception if an error occurs while reading the store.
>        */
>       @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
>       public abstract TabularData getAllJobs() throws Exception;
> -
> +
>       /**
> -     * Get all outstanding jobs due to run between start and finish
> +     * Get all outstanding jobs due to run between start and finish time range.
> +     *
>        * @param start
> +     *        the starting time range to query the store for jobs.
>        * @param finish
> -     * @return a table of jobs in the range
> -     * @throws Exception
> -
> +     *        the ending time of this query for scheduled jobs.
> +     *
> +     * @return a table of jobs in the range given.
> +     *
> +     * @throws Exception if an error occurs while querying the scheduler store.
>        */
>       @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
>       public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
> +
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> index 7b28a5b..047fe23 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> @@ -16,7 +16,12 @@
>    */
>   package org.apache.activemq.broker.scheduler;
>   
> -
> +/**
> + * Interface for a scheduled Job object.
> + *
> + * Each Job is identified by a unique Job Id which can be used to reference the Job
> + * in the Job Scheduler store for updates or removal.
> + */
>   public interface Job {
>   
>       /**
> @@ -38,11 +43,12 @@ public interface Job {
>        * @return the Delay
>        */
>       public abstract long getDelay();
> +
>       /**
>        * @return the period
>        */
>       public abstract long getPeriod();
> -
> +
>       /**
>        * @return the cron entry
>        */
> @@ -52,17 +58,24 @@ public interface Job {
>        * @return the payload
>        */
>       public abstract byte[] getPayload();
> -
> +
>       /**
>        * Get the start time as a Date time string
>        * @return the date time
>        */
>       public String getStartTime();
> -
> +
>       /**
> -     * Get the time the job is next due to execute
> +     * Get the time the job is next due to execute
>        * @return the date time
>        */
>       public String getNextExecutionTime();
>   
> +    /**
> +     * Gets the total number of times this job has executed.
> +     *
> +     * @returns the number of times this job has been executed.
> +     */
> +    public int getExecutionCount();
> +
>   }
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> index c53d9c6..a453595 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> @@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
>   
>   import org.apache.activemq.util.ByteSequence;
>   
> +/**
> + * Job event listener interface. Provides event points for Job related events
> + * such as job ready events.
> + */
>   public interface JobListener {
> -
> +
>       /**
> -     * A Job that has been scheduled is now ready
> -     * @param id
> +     * A Job that has been scheduled is now ready to be fired.  The Job is passed
> +     * in its raw byte form and must be un-marshaled before being delivered.
> +     *
> +     * @param jobId
> +     *        The unique Job Id of the Job that is ready to fire.
>        * @param job
> +     *        The job that is now ready, delivered in byte form.
>        */
> -    public void scheduledJob(String id,ByteSequence job);
> +    public void scheduledJob(String id, ByteSequence job);
>   
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> index 2e96eae..e951861 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> @@ -46,20 +46,25 @@ public interface JobScheduler {
>       void stopDispatching() throws Exception;
>   
>       /**
> -     * Add a Job listener
> +     * Add a Job listener which will receive events related to scheduled jobs.
> +     *
> +     * @param listener
> +     *      The job listener to add.
>        *
> -     * @param l
>        * @throws Exception
>        */
> -    void addListener(JobListener l) throws Exception;
> +    void addListener(JobListener listener) throws Exception;
>   
>       /**
> -     * remove a JobListener
> +     * remove a JobListener that was previously registered.  If the given listener is not in
> +     * the registry this method has no effect.
> +     *
> +     * @param listener
> +     *      The listener that should be removed from the listener registry.
>        *
> -     * @param l
>        * @throws Exception
>        */
> -    void removeListener(JobListener l) throws Exception;
> +    void removeListener(JobListener listener) throws Exception;
>   
>       /**
>        * Add a job to be scheduled
> @@ -70,7 +75,8 @@ public interface JobScheduler {
>        *            the message to be sent when the job is scheduled
>        * @param delay
>        *            the time in milliseconds before the job will be run
> -     * @throws Exception
> +     *
> +     * @throws Exception if an error occurs while scheduling the Job.
>        */
>       void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
>   
> @@ -82,8 +88,9 @@ public interface JobScheduler {
>        * @param payload
>        *            the message to be sent when the job is scheduled
>        * @param cronEntry
> -     *            - cron entry
> -     * @throws Exception
> +     *            The cron entry to use to schedule this job.
> +     *
> +     * @throws Exception if an error occurs while scheduling the Job.
>        */
>       void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
>   
> @@ -95,7 +102,7 @@ public interface JobScheduler {
>        * @param payload
>        *            the message to be sent when the job is scheduled
>        * @param cronEntry
> -     *            - cron entry
> +     *            cron entry
>        * @param delay
>        *            time in ms to wait before scheduling
>        * @param period
> @@ -110,6 +117,8 @@ public interface JobScheduler {
>        * remove all jobs scheduled to run at this time
>        *
>        * @param time
> +     *      The UTC time to use to remove a batch of scheduled Jobs.
> +     *
>        * @throws Exception
>        */
>       void remove(long time) throws Exception;
> @@ -118,7 +127,9 @@ public interface JobScheduler {
>        * remove a job with the matching jobId
>        *
>        * @param jobId
> -     * @throws Exception
> +     *      The unique Job Id to search for and remove from the scheduled set of jobs.
> +     *
> +     * @throws Exception if an error occurs while removing the Job.
>        */
>       void remove(String jobId) throws Exception;
>   
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> index d46d04a..24a216a 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> @@ -21,6 +21,12 @@ import java.util.List;
>   
>   import org.apache.activemq.util.ByteSequence;
>   
> +/**
> + * A wrapper for instances of the JobScheduler interface that ensures that methods
> + * provides safe and sane return values and can deal with null values being passed
> + * in etc.  Provides a measure of safety when using unknown implementations of the
> + * JobSchedulerStore which might not always do the right thing.
> + */
>   public class JobSchedulerFacade implements JobScheduler {
>   
>       private final SchedulerBroker broker;
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> index 3cbc367..c6863c7 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> @@ -26,13 +26,56 @@ import org.apache.activemq.Service;
>    */
>   public interface JobSchedulerStore extends Service {
>   
> +    /**
> +     * Gets the location where the Job Scheduler will write the persistent data used
> +     * to preserve and recover scheduled Jobs.
> +     *
> +     * If the scheduler implementation does not utilize a file system based store this
> +     * method returns null.
> +     *
> +     * @return the directory where persistent store data is written.
> +     */
>       File getDirectory();
>   
> +    /**
> +     * Sets the directory where persistent store data will be written.  This method
> +     * must be called before the scheduler store is started to have any effect.
> +     *
> +     * @param directory
> +     *      The directory where the job scheduler store is to be located.
> +     */
>       void setDirectory(File directory);
>   
> +    /**
> +     * The size of the current store on disk if the store utilizes a disk based store
> +     * mechanism.
> +     *
> +     * @return the current store size on disk.
> +     */
>       long size();
>   
> +    /**
> +     * Returns the JobScheduler instance identified by the given name.
> +     *
> +     * @param name
> +     *        the name of the JobScheduler instance to lookup.
> +     *
> +     * @return the named JobScheduler or null if none exists with the given name.
> +     *
> +     * @throws Exception if an error occurs while loading the named scheduler.
> +     */
>       JobScheduler getJobScheduler(String name) throws Exception;
>   
> +    /**
> +     * Removes the named JobScheduler if it exists, purging all scheduled messages
> +     * assigned to it.
> +     *
> +     * @param name
> +     *        the name of the scheduler instance to remove.
> +     *
> +     * @return true if there was a scheduler with the given name to remove.
> +     *
> +     * @throws Exception if an error occurs while removing the scheduler.
> +     */
>       boolean removeJobScheduler(String name) throws Exception;
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> index 6b78d77..fc5b8dd 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> @@ -20,7 +20,11 @@ import java.text.DateFormat;
>   import java.text.SimpleDateFormat;
>   import java.util.Date;
>   
> +/**
> + * A class to provide common Job Scheduler related methods.
> + */
>   public class JobSupport {
> +
>       public static String getDateTime(long value) {
>           DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
>           Date date = new Date(value);
> @@ -32,5 +36,4 @@ public class JobSupport {
>            Date date = dfm.parse(value);
>            return date.getTime();
>        }
> -
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> index 31efd32..01a9634 100755
> --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> @@ -22,6 +22,7 @@ import java.util.Set;
>   
>   import org.apache.activemq.Service;
>   import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
>   import org.apache.activemq.command.ActiveMQDestination;
>   import org.apache.activemq.command.ActiveMQQueue;
>   import org.apache.activemq.command.ActiveMQTopic;
> @@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
>   /**
>    * Adapter to the actual persistence mechanism used with ActiveMQ
>    *
> - *
> + *
>    */
>   public interface PersistenceAdapter extends Service {
>   
>       /**
> -     * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
> -     * objects that the persistence store is aware exist.
> +     * Returns a set of all the
> +     * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
> +     * persistence store is aware exist.
>        *
>        * @return active destinations
>        */
>       Set<ActiveMQDestination> getDestinations();
>   
>       /**
> -     * Factory method to create a new queue message store with the given destination name
> +     * Factory method to create a new queue message store with the given
> +     * destination name
> +     *
>        * @param destination
>        * @return the message store
> -     * @throws IOException
> +     * @throws IOException
>        */
>       MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
>   
>       /**
> -     * Factory method to create a new topic message store with the given destination name
> -     * @param destination
> +     * Factory method to create a new topic message store with the given
> +     * destination name
> +     *
> +     * @param destination
>        * @return the topic message store
> -     * @throws IOException
> +     * @throws IOException
>        */
>       TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
>   
>       /**
> +     * Creates and returns a new Job Scheduler store instance.
> +     *
> +     * @return a new JobSchedulerStore instance if this Persistence adapter provides its own.
> +     *
> +     * @throws IOException If an error occurs while creating the new JobSchedulerStore.
> +     * @throws UnsupportedOperationException If this adapter does not provide its own
> +     *                                       scheduler store implementation.
> +     */
> +    JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException;
> +
> +    /**
>        * Cleanup method to remove any state associated with the given destination.
>        * This method does not stop the message store (it might not be cached).
> -     * @param destination Destination to forget
> +     *
> +     * @param destination
> +     *            Destination to forget
>        */
>       void removeQueueMessageStore(ActiveMQQueue destination);
>   
>       /**
>        * Cleanup method to remove any state associated with the given destination
>        * This method does not stop the message store (it might not be cached).
> -     * @param destination Destination to forget
> +     *
> +     * @param destination
> +     *            Destination to forget
>        */
>       void removeTopicMessageStore(ActiveMQTopic destination);
>   
>       /**
> -     * Factory method to create a new persistent prepared transaction store for XA recovery
> +     * Factory method to create a new persistent prepared transaction store for
> +     * XA recovery
> +     *
>        * @return transaction store
> -     * @throws IOException
> +     * @throws IOException
>        */
>       TransactionStore createTransactionStore() throws IOException;
>   
>       /**
> -     * This method starts a transaction on the persistent storage - which is nothing to
> -     * do with JMS or XA transactions - its purely a mechanism to perform multiple writes
> -     * to a persistent store in 1 transaction as a performance optimization.
> +     * This method starts a transaction on the persistent storage - which is
> +     * nothing to do with JMS or XA transactions - its purely a mechanism to
> +     * perform multiple writes to a persistent store in 1 transaction as a
> +     * performance optimization.
>        * <p/>
> -     * Typically one transaction will require one disk synchronization point and so for
> -     * real high performance its usually faster to perform many writes within the same
> -     * transaction to minimize latency caused by disk synchronization. This is especially
> -     * true when using tools like Berkeley Db or embedded JDBC servers.
> -     * @param context
> -     * @throws IOException
> +     * Typically one transaction will require one disk synchronization point and
> +     * so for real high performance its usually faster to perform many writes
> +     * within the same transaction to minimize latency caused by disk
> +     * synchronization. This is especially true when using tools like Berkeley
> +     * Db or embedded JDBC servers.
> +     *
> +     * @param context
> +     * @throws IOException
>        */
>       void beginTransaction(ConnectionContext context) throws IOException;
>   
> -
>       /**
>        * Commit a persistence transaction
> -     * @param context
> -     * @throws IOException
> +     *
> +     * @param context
> +     * @throws IOException
>        *
>        * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
>        */
> @@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
>   
>       /**
>        * Rollback a persistence transaction
> -     * @param context
> -     * @throws IOException
> +     *
> +     * @param context
> +     * @throws IOException
>        *
>        * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
>        */
>       void rollbackTransaction(ConnectionContext context) throws IOException;
> -
> +
>       /**
> -     *
> +     *
>        * @return last broker sequence
>        * @throws IOException
>        */
>       long getLastMessageBrokerSequenceId() throws IOException;
> -
> +
>       /**
>        * Delete's all the messages in the persistent store.
> -     *
> +     *
>        * @throws IOException
>        */
>       void deleteAllMessages() throws IOException;
> -
> +
>       /**
> -     * @param usageManager The UsageManager that is controlling the broker's memory usage.
> +     * @param usageManager
> +     *            The UsageManager that is controlling the broker's memory
> +     *            usage.
>        */
>       void setUsageManager(SystemUsage usageManager);
> -
> +
>       /**
>        * Set the name of the broker using the adapter
> +     *
>        * @param brokerName
>        */
>       void setBrokerName(String brokerName);
> -
> +
>       /**
>        * Set the directory where any data files should be created
> +     *
>        * @param dir
>        */
>       void setDirectory(File dir);
> @@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
>        * @return the directory used by the persistence adaptor
>        */
>       File getDirectory();
> -
> +
>       /**
>        * checkpoint any
> -     * @param sync
> -     * @throws IOException
> +     *
> +     * @param sync
> +     * @throws IOException
>        *
>        */
>       void checkpoint(boolean sync) throws IOException;
> -
> +
>       /**
>        * A hint to return the size of the store on disk
> +     *
>        * @return disk space used in bytes of 0 if not implemented
>        */
>       long size();
>   
>       /**
> -     * return the last stored producer sequenceId for this producer Id
> -     * used to suppress duplicate sends on failover reconnect at the transport
> -     * when a reconnect occurs
> -     * @param id the producerId to find a sequenceId for
> +     * return the last stored producer sequenceId for this producer Id used to
> +     * suppress duplicate sends on failover reconnect at the transport when a
> +     * reconnect occurs
> +     *
> +     * @param id
> +     *            the producerId to find a sequenceId for
>        * @return the last stored sequence id or -1 if no suppression needed
>        */
>       long getLastProducerSequenceId(ProducerId id) throws IOException;
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> index 0fd6bfc..73ea104 100755
> --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> @@ -24,6 +24,7 @@ import java.util.Set;
>   import java.util.concurrent.ConcurrentHashMap;
>   
>   import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
>   import org.apache.activemq.command.ActiveMQDestination;
>   import org.apache.activemq.command.ActiveMQQueue;
>   import org.apache.activemq.command.ActiveMQTopic;
> @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
>   
>   /**
>    * @org.apache.xbean.XBean
> - *
> + *
>    */
>   public class MemoryPersistenceAdapter implements PersistenceAdapter {
>       private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
> @@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>       ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
>       private boolean useExternalMessageReferences;
>   
> +    @Override
>       public Set<ActiveMQDestination> getDestinations() {
>           Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
>           for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
> @@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>           return new MemoryPersistenceAdapter();
>       }
>   
> +    @Override
>       public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
>           MessageStore rc = queues.get(destination);
>           if (rc == null) {
> @@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>           return rc;
>       }
>   
> +    @Override
>       public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
>           TopicMessageStore rc = topics.get(destination);
>           if (rc == null) {
> @@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>        *
>        * @param destination Destination to forget
>        */
> +    @Override
>       public void removeQueueMessageStore(ActiveMQQueue destination) {
>           queues.remove(destination);
>       }
> @@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>        *
>        * @param destination Destination to forget
>        */
> +    @Override
>       public void removeTopicMessageStore(ActiveMQTopic destination) {
>           topics.remove(destination);
>       }
>   
> +    @Override
>       public TransactionStore createTransactionStore() throws IOException {
>           if (transactionStore == null) {
>               transactionStore = new MemoryTransactionStore(this);
> @@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>           return transactionStore;
>       }
>   
> +    @Override
>       public void beginTransaction(ConnectionContext context) {
>       }
>   
> +    @Override
>       public void commitTransaction(ConnectionContext context) {
>       }
>   
> +    @Override
>       public void rollbackTransaction(ConnectionContext context) {
>       }
>   
> +    @Override
>       public void start() throws Exception {
>       }
>   
> +    @Override
>       public void stop() throws Exception {
>       }
>   
> +    @Override
>       public long getLastMessageBrokerSequenceId() throws IOException {
>           return 0;
>       }
>   
> +    @Override
>       public void deleteAllMessages() throws IOException {
>           for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
>               MemoryMessageStore store = asMemoryMessageStore(iter.next());
> @@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
>        * @param usageManager The UsageManager that is controlling the broker's
>        *                memory usage.
>        */
> +    @Override
>       public void setUsageManager(SystemUsage usageManager) {
>       }
>   
> +    @Override
>       public String toString() {
>           return "MemoryPersistenceAdapter";
>       }
>   
> +    @Override
>       public void setBrokerName(String brokerName) {
>       }
>   
> +    @Override
>       public void setDirectory(File dir) {
>       }
> -
> +
> +    @Override
>       public File getDirectory(){
>           return null;
>       }
>   
> +    @Override
>       public void checkpoint(boolean sync) throws IOException {
>       }
> -
> +
> +    @Override
>       public long size(){
>           return 0;
>       }
> -
> +
>       public void setCreateTransactionStore(boolean create) throws IOException {
>           if (create) {
>               createTransactionStore();
>           }
>       }
>   
> +    @Override
>       public long getLastProducerSequenceId(ProducerId id) {
>           // memory map does duplicate suppression
>           return -1;
>       }
> +
> +    @Override
> +    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> +        // We could eventuall implement an in memory scheduler.
> +        throw new UnsupportedOperationException();
> +    }
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> index a623de9..2a70194 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> @@ -61,8 +61,9 @@ public final class IOHelper {
>       }
>   
>       /**
> -     * Converts any string into a string that is safe to use as a file name.
> -     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
> +     * Converts any string into a string that is safe to use as a file name. The
> +     * result will only include ascii characters and numbers, and the "-","_",
> +     * and "." characters.
>        *
>        * @param name
>        * @return
> @@ -76,15 +77,16 @@ public final class IOHelper {
>       }
>   
>       /**
> -     * Converts any string into a string that is safe to use as a file name.
> -     * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
> +     * Converts any string into a string that is safe to use as a file name. The
> +     * result will only include ascii characters and numbers, and the "-","_",
> +     * and "." characters.
>        *
>        * @param name
>        * @param dirSeparators
>        * @param maxFileLength
>        * @return
>        */
> -    public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
> +    public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
>           int size = name.length();
>           StringBuffer rc = new StringBuffer(size * 2);
>           for (int i = 0; i < size; i++) {
> @@ -92,8 +94,7 @@ public final class IOHelper {
>               boolean valid = c >= 'a' && c <= 'z';
>               valid = valid || (c >= 'A' && c <= 'Z');
>               valid = valid || (c >= '0' && c <= '9');
> -            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
> -                    ||(dirSeparators && ( (c == '/') || (c == '\\')));
> +            valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\')));
>   
>               if (valid) {
>                   rc.append(c);
> @@ -105,7 +106,7 @@ public final class IOHelper {
>           }
>           String result = rc.toString();
>           if (result.length() > maxFileLength) {
> -            result = result.substring(result.length()-maxFileLength,result.length());
> +            result = result.substring(result.length() - maxFileLength, result.length());
>           }
>           return result;
>       }
> @@ -168,8 +169,7 @@ public final class IOHelper {
>               } else {
>                   for (int i = 0; i < files.length; i++) {
>                       File file = files[i];
> -                    if (file.getName().equals(".")
> -                            || file.getName().equals("..")) {
> +                    if (file.getName().equals(".") || file.getName().equals("..")) {
>                           continue;
>                       }
>                       if (file.isDirectory()) {
> @@ -190,6 +190,27 @@ public final class IOHelper {
>           }
>       }
>   
> +    public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
> +        if (!srcDirectory.isDirectory()) {
> +            throw new IOException("source is not a directory");
> +        }
> +
> +        if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
> +            throw new IOException("target exists and is not a directory");
> +        } else {
> +            mkdirs(targetDirectory);
> +        }
> +
> +        List<File> filesToMove = new ArrayList<File>();
> +        getFiles(srcDirectory, filesToMove, filter);
> +
> +        for (File file : filesToMove) {
> +            if (!file.isDirectory()) {
> +                moveFile(file, targetDirectory);
> +            }
> +        }
> +    }
> +
>       public static void copyFile(File src, File dest) throws IOException {
>           copyFile(src, dest, null);
>       }
> @@ -222,32 +243,32 @@ public final class IOHelper {
>           File parent = src.getParentFile();
>           String fromPath = from.getAbsolutePath();
>           if (parent.getAbsolutePath().equals(fromPath)) {
> -            //one level down
> +            // one level down
>               result = to;
> -        }else {
> +        } else {
>               String parentPath = parent.getAbsolutePath();
>               String path = parentPath.substring(fromPath.length());
> -            result = new File(to.getAbsolutePath()+File.separator+path);
> +            result = new File(to.getAbsolutePath() + File.separator + path);
>           }
>           return result;
>       }
>   
> -    static List<File> getFiles(File dir,FilenameFilter filter){
> +    static List<File> getFiles(File dir, FilenameFilter filter) {
>           List<File> result = new ArrayList<File>();
> -        getFiles(dir,result,filter);
> +        getFiles(dir, result, filter);
>           return result;
>       }
>   
> -    static void getFiles(File dir,List<File> list,FilenameFilter filter) {
> +    static void getFiles(File dir, List<File> list, FilenameFilter filter) {
>           if (!list.contains(dir)) {
>               list.add(dir);
> -            String[] fileNames=dir.list(filter);
> -            for (int i =0; i < fileNames.length;i++) {
> -                File f = new File(dir,fileNames[i]);
> +            String[] fileNames = dir.list(filter);
> +            for (int i = 0; i < fileNames.length; i++) {
> +                File f = new File(dir, fileNames[i]);
>                   if (f.isFile()) {
>                       list.add(f);
> -                }else {
> -                    getFiles(dir,list,filter);
> +                } else {
> +                    getFiles(dir, list, filter);
>                   }
>               }
>           }
> @@ -286,12 +307,13 @@ public final class IOHelper {
>       public static void mkdirs(File dir) throws IOException {
>           if (dir.exists()) {
>               if (!dir.isDirectory()) {
> -                throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
> +                throw new IOException("Failed to create directory '" + dir +
> +                                      "', regular file already existed with that name");
>               }
>   
>           } else {
>               if (!dir.mkdirs()) {
> -                throw new IOException("Failed to create directory '" + dir+"'");
> +                throw new IOException("Failed to create directory '" + dir + "'");
>               }
>           }
>       }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> index 7ff4ae0..a3a8250 100755
> --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> @@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
>   import org.apache.activemq.broker.BrokerService;
>   import org.apache.activemq.broker.ConnectionContext;
>   import org.apache.activemq.broker.Locker;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
>   import org.apache.activemq.command.ActiveMQDestination;
>   import org.apache.activemq.command.ActiveMQQueue;
>   import org.apache.activemq.command.ActiveMQTopic;
> @@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
>           this.lockDataSource = dataSource;
>       }
>   
> +    @Override
>       public BrokerService getBrokerService() {
>           return brokerService;
>       }
> @@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
>           }
>           return result;
>       }
> +
> +    @Override
> +    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> +        throw new UnsupportedOperationException();
> +    }
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> index 565fc9f..cc5282f 100755
> --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
>   import java.util.concurrent.ThreadPoolExecutor;
>   import java.util.concurrent.TimeUnit;
>   import java.util.concurrent.atomic.AtomicBoolean;
> +
>   import org.apache.activeio.journal.InvalidRecordLocationException;
>   import org.apache.activeio.journal.Journal;
>   import org.apache.activeio.journal.JournalEventListener;
> @@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
>   import org.apache.activemq.broker.BrokerService;
>   import org.apache.activemq.broker.BrokerServiceAware;
>   import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
>   import org.apache.activemq.command.ActiveMQDestination;
>   import org.apache.activemq.command.ActiveMQQueue;
>   import org.apache.activemq.command.ActiveMQTopic;
> @@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
>    * An implementation of {@link PersistenceAdapter} designed for use with a
>    * {@link Journal} and then check pointing asynchronously on a timeout with some
>    * other long term persistent storage.
> - *
> + *
>    * @org.apache.xbean.XBean
> - *
> + *
>    */
>   public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
>   
>       private BrokerService brokerService;
> -	
> +
>       protected Scheduler scheduler;
>       private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
>   
> @@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>       private TaskRunnerFactory taskRunnerFactory;
>       private File directory;
>   
> -    public JournalPersistenceAdapter() {
> +    public JournalPersistenceAdapter() {
>       }
> -
> +
>       public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
>           setJournal(journal);
>           setTaskRunnerFactory(taskRunnerFactory);
> @@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           this.journal = journal;
>           journal.setJournalEventListener(this);
>       }
> -
> +
>       public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
>           this.longTermPersistence = longTermPersistence;
>       }
> -
> +
>       final Runnable createPeriodicCheckpointTask() {
>           return new Runnable() {
> +            @Override
>               public void run() {
>                   long lastTime = 0;
>                   synchronized (this) {
> @@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>        * @param usageManager The UsageManager that is controlling the
>        *                destination's memory usage.
>        */
> +    @Override
>       public void setUsageManager(SystemUsage usageManager) {
>           this.usageManager = usageManager;
>           longTermPersistence.setUsageManager(usageManager);
>       }
>   
> +    @Override
>       public Set<ActiveMQDestination> getDestinations() {
>           Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
>           destinations.addAll(queues.keySet());
> @@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           }
>       }
>   
> +    @Override
>       public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
>           JournalMessageStore store = queues.get(destination);
>           if (store == null) {
> @@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           return store;
>       }
>   
> +    @Override
>       public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
>           JournalTopicMessageStore store = topics.get(destinationName);
>           if (store == null) {
> @@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>        *
>        * @param destination Destination to forget
>        */
> +    @Override
>       public void removeQueueMessageStore(ActiveMQQueue destination) {
>           queues.remove(destination);
>       }
> @@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>        *
>        * @param destination Destination to forget
>        */
> +    @Override
>       public void removeTopicMessageStore(ActiveMQTopic destination) {
>           topics.remove(destination);
>       }
>   
> +    @Override
>       public TransactionStore createTransactionStore() throws IOException {
>           return transactionStore;
>       }
>   
> +    @Override
>       public long getLastMessageBrokerSequenceId() throws IOException {
>           return longTermPersistence.getLastMessageBrokerSequenceId();
>       }
>   
> +    @Override
>       public void beginTransaction(ConnectionContext context) throws IOException {
>           longTermPersistence.beginTransaction(context);
>       }
>   
> +    @Override
>       public void commitTransaction(ConnectionContext context) throws IOException {
>           longTermPersistence.commitTransaction(context);
>       }
>   
> +    @Override
>       public void rollbackTransaction(ConnectionContext context) throws IOException {
>           longTermPersistence.rollbackTransaction(context);
>       }
>   
> +    @Override
>       public synchronized void start() throws Exception {
>           if (!started.compareAndSet(false, true)) {
>               return;
> @@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           }
>   
>           checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
> +            @Override
>               public boolean iterate() {
>                   return doCheckpoint();
>               }
>           }, "ActiveMQ Journal Checkpoint Worker");
>   
>           checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
> +            @Override
>               public Thread newThread(Runnable runable) {
>                   Thread t = new Thread(runable, "Journal checkpoint worker");
>                   t.setPriority(7);
> @@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>   
>       }
>   
> +    @Override
>       public void stop() throws Exception {
>   
>           this.usageManager.getMemoryUsage().removeUsageListener(this);
> @@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>       /**
>        * The Journal give us a call back so that we can move old data out of the
>        * journal. Taking a checkpoint does this for us.
> -     *
> +     *
>        * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
>        */
> +    @Override
>       public void overflowNotification(RecordLocation safeLocation) {
>           checkpoint(false, true);
>       }
>   
>       /**
>        * When we checkpoint we move all the journalled data to long term storage.
> -     *
> +     *
>        */
>       public void checkpoint(boolean sync, boolean fullCheckpoint) {
>           try {
> @@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           }
>       }
>   
> +    @Override
>       public void checkpoint(boolean sync) {
>           checkpoint(sync, sync);
>       }
>   
>       /**
>        * This does the actual checkpoint.
> -     *
> +     *
>        * @return
>        */
>       public boolean doCheckpoint() {
> @@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>               // We do many partial checkpoints (fullCheckpoint==false) to move
>               // topic messages
>               // to long term store as soon as possible.
> -            //
> +            //
>               // We want to avoid doing that for queue messages since removes the
>               // come in the same
>               // checkpoint cycle will nullify the previous message add.
> @@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>                       try {
>                           final JournalMessageStore ms = iterator.next();
>                           FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
> +                            @Override
>                               public RecordLocation call() throws Exception {
>                                   return ms.checkpoint();
>                               }
> @@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>                   try {
>                       final JournalTopicMessageStore ms = iterator.next();
>                       FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
> +                        @Override
>                           public RecordLocation call() throws Exception {
>                               return ms.checkpoint();
>                           }
> @@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>       /**
>        * Move all the messages that were in the journal into long term storage. We
>        * just replay and do a checkpoint.
> -     *
> +     *
>        * @throws IOException
>        * @throws IOException
>        * @throws InvalidRecordLocationException
> @@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>       public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
>           if (started.get()) {
>               try {
> -        	    return journal.write(toPacket(wireFormat.marshal(command)), sync);
> +                return journal.write(toPacket(wireFormat.marshal(command)), sync);
>               } catch (IOException ioe) {
> -        	    LOG.error("Cannot write to the journal", ioe);
> -        	    brokerService.handleIOException(ioe);
> -        	    throw ioe;
> +                LOG.error("Cannot write to the journal", ioe);
> +                brokerService.handleIOException(ioe);
> +                throw ioe;
>               }
>           }
>           throw new IOException("closed");
> @@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           return writeCommand(trace, sync);
>       }
>   
> +    @Override
>       public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
>           newPercentUsage = (newPercentUsage / 10) * 10;
>           oldPercentUsage = (oldPercentUsage / 10) * 10;
> @@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           return transactionStore;
>       }
>   
> +    @Override
>       public void deleteAllMessages() throws IOException {
>           try {
>               JournalTrace trace = new JournalTrace();
> @@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
>       }
>   
> +    @Override
>       public void setBrokerName(String brokerName) {
>           longTermPersistence.setBrokerName(brokerName);
>       }
> @@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           return "JournalPersistenceAdapter(" + longTermPersistence + ")";
>       }
>   
> +    @Override
>       public void setDirectory(File dir) {
>           this.directory=dir;
>       }
> -
> +
> +    @Override
>       public File getDirectory(){
>           return directory;
>       }
> -
> +
> +    @Override
>       public long size(){
>           return 0;
>       }
>   
> +    @Override
>       public void setBrokerService(BrokerService brokerService) {
>           this.brokerService = brokerService;
>           PersistenceAdapter pa = getLongTermPersistence();
> @@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>           }
>       }
>   
> +    @Override
>       public long getLastProducerSequenceId(ProducerId id) {
>           return -1;
>       }
>   
> +    @Override
> +    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> +        return longTermPersistence.createJobSchedulerStore();
> +    }
> +
>   }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> ----------------------------------------------------------------------
> diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> new file mode 100644
> index 0000000..edb2750
> --- /dev/null
> +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> @@ -0,0 +1,57 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.activemq.store.kahadb;
> +
> +import org.apache.activemq.store.kahadb.disk.journal.Location;
> +import org.apache.activemq.store.kahadb.disk.page.Page;
> +
> +public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
> +
> +    private int state;
> +    private Location lastUpdateLocation;
> +    private Page<T> page;
> +
> +    @Override
> +    public Page<T> getPage() {
> +        return page;
> +    }
> +
> +    @Override
> +    public int getState() {
> +        return state;
> +    }
> +
> +    @Override
> +    public Location getLastUpdateLocation() {
> +        return lastUpdateLocation;
> +    }
> +
> +    @Override
> +    public void setPage(Page<T> page) {
> +        this.page = page;
> +    }
> +
> +    @Override
> +    public void setState(int value) {
> +        this.state = value;
> +    }
> +
> +    @Override
> +    public void setLastUpdateLocation(Location location) {
> +        this.lastUpdateLocation = location;
> +    }
> +}
>
> .
>


-- 
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/