You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/16 23:10:04 UTC
[16/16] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-3758
https://issues.apache.org/jira/browse/AMQ-3758
Refactor the scheduler store into a more KahaDB style store that can
recover from various problems like missing journal files or corruption
as well as rebuild its index when needed. Move the scheduler store into
a more configurable style that allows for users to plug in their own
implementations. Store update from legacy versions is automatic.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48
Branch: refs/heads/activemq-5.10.x
Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
Parents: db669e4
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 7 12:28:11 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Tue Dec 16 16:11:09 2014 -0500
----------------------------------------------------------------------
.../apache/activemq/broker/BrokerService.java | 25 +-
.../activemq/broker/jmx/JobSchedulerView.java | 56 +-
.../broker/jmx/JobSchedulerViewMBean.java | 113 +-
.../apache/activemq/broker/scheduler/Job.java | 23 +-
.../activemq/broker/scheduler/JobListener.java | 16 +-
.../activemq/broker/scheduler/JobScheduler.java | 33 +-
.../broker/scheduler/JobSchedulerFacade.java | 6 +
.../broker/scheduler/JobSchedulerStore.java | 43 +
.../activemq/broker/scheduler/JobSupport.java | 5 +-
.../activemq/store/PersistenceAdapter.java | 119 +-
.../store/memory/MemoryPersistenceAdapter.java | 36 +-
.../java/org/apache/activemq/util/IOHelper.java | 68 +-
.../store/jdbc/JDBCPersistenceAdapter.java | 7 +
.../journal/JournalPersistenceAdapter.java | 71 +-
.../store/kahadb/AbstractKahaDBMetaData.java | 57 +
.../store/kahadb/AbstractKahaDBStore.java | 745 ++++++++++++
.../activemq/store/kahadb/KahaDBMetaData.java | 135 +++
.../store/kahadb/KahaDBPersistenceAdapter.java | 15 +-
.../activemq/store/kahadb/KahaDBStore.java | 55 +-
.../kahadb/MultiKahaDBPersistenceAdapter.java | 56 +-
.../kahadb/MultiKahaDBTransactionStore.java | 18 +-
.../activemq/store/kahadb/TempKahaDBStore.java | 138 ++-
.../apache/activemq/store/kahadb/Visitor.java | 20 +
.../store/kahadb/scheduler/JobImpl.java | 21 +-
.../store/kahadb/scheduler/JobLocation.java | 77 +-
.../scheduler/JobLocationsMarshaller.java | 53 +
.../kahadb/scheduler/JobSchedulerImpl.java | 837 ++++++++------
.../scheduler/JobSchedulerKahaDBMetaData.java | 246 ++++
.../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
.../scheduler/UnknownStoreVersionException.java | 24 +
.../kahadb/scheduler/legacy/LegacyJobImpl.java | 72 ++
.../scheduler/legacy/LegacyJobLocation.java | 296 +++++
.../legacy/LegacyJobSchedulerImpl.java | 222 ++++
.../legacy/LegacyJobSchedulerStoreImpl.java | 378 ++++++
.../scheduler/legacy/LegacyStoreReplayer.java | 155 +++
.../src/main/proto/journal-data.proto | 61 +
.../apache/activemq/leveldb/LevelDBStore.scala | 5 +
.../leveldb/replicated/ProxyLevelDBStore.scala | 5 +
.../JobSchedulerBrokerShutdownTest.java | 1 +
.../JobSchedulerJmxManagementTests.java | 155 +++
.../scheduler/JobSchedulerManagementTest.java | 84 +-
.../JobSchedulerStoreCheckpointTest.java | 125 ++
.../broker/scheduler/JobSchedulerStoreTest.java | 46 +-
.../broker/scheduler/JobSchedulerTest.java | 36 +
.../scheduler/JobSchedulerTestSupport.java | 112 ++
.../KahaDBSchedulerIndexRebuildTest.java | 179 +++
.../KahaDBSchedulerMissingJournalLogsTest.java | 204 ++++
.../scheduler/SchedulerDBVersionTest.java | 164 +++
.../src/test/resources/log4j.properties | 1 +
.../activemq/store/schedulerDB/legacy/db-1.log | Bin 0 -> 524288 bytes
.../store/schedulerDB/legacy/scheduleDB.data | Bin 0 -> 20480 bytes
.../store/schedulerDB/legacy/scheduleDB.redo | Bin 0 -> 16408 bytes
52 files changed, 5584 insertions(+), 911 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 00d4abd..5becec2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
try {
PersistenceAdapter pa = getPersistenceAdapter();
+ if (pa != null) {
+ this.jobSchedulerStore = pa.createJobSchedulerStore();
+ jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
+ configureService(jobSchedulerStore);
+ jobSchedulerStore.start();
+ return this.jobSchedulerStore;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (UnsupportedOperationException ex) {
+ // It's ok if the store doesn't implement a scheduler.
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ try {
+ PersistenceAdapter pa = getPersistenceAdapter();
if (pa != null && pa instanceof JobSchedulerStore) {
this.jobSchedulerStore = (JobSchedulerStore) pa;
configureService(jobSchedulerStore);
@@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
throw new RuntimeException(e);
}
+ // Load the KahaDB store as a last resort, this only works if KahaDB is
+ // included at runtime, otherwise this will fail. User should disable
+ // scheduler support if this fails.
try {
- String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
- jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
+ String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
+ PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
+ jobSchedulerStore = adaptor.createJobSchedulerStore();
jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
configureService(jobSchedulerStore);
jobSchedulerStore.start();
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index 9e5a1fb..2118a96 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -16,23 +16,39 @@
*/
package org.apache.activemq.broker.jmx;
+import java.util.List;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
-import javax.management.openmbean.*;
-import java.io.IOException;
-import java.util.List;
-
+/**
+ * MBean object that can be used to manage a single instance of a JobScheduler. The object
+ * provides methods for querying for jobs and removing some or all of the jobs that are
+ * scheduled in the managed store.
+ */
public class JobSchedulerView implements JobSchedulerViewMBean {
private final JobScheduler jobScheduler;
+ /**
+ * Creates a new instance of the JobScheduler management MBean.
+ *
+ * @param jobScheduler
+ * The scheduler instance to manage.
+ */
public JobSchedulerView(JobScheduler jobScheduler) {
this.jobScheduler = jobScheduler;
}
+ @Override
public TabularData getAllJobs() throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
return rc;
}
+ @Override
public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
return rc;
}
+ @Override
public TabularData getNextScheduleJobs() throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
@@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
return rc;
}
+ @Override
public String getNextScheduleTime() throws Exception {
long time = this.jobScheduler.getNextScheduleTime();
return JobSupport.getDateTime(time);
}
+ @Override
public void removeAllJobs() throws Exception {
this.jobScheduler.removeAllJobs();
-
}
+ @Override
public void removeAllJobs(String startTime, String finishTime) throws Exception {
long start = JobSupport.getDataTime(startTime);
long finish = JobSupport.getDataTime(finishTime);
this.jobScheduler.removeAllJobs(start, finish);
+ }
+ @Override
+ public void removeAllJobsAtScheduledTime(String time) throws Exception {
+ long removeAtTime = JobSupport.getDataTime(time);
+ this.jobScheduler.remove(removeAtTime);
}
+ @Override
+ public void removeJobAtScheduledTime(String time) throws Exception {
+ removeAllJobsAtScheduledTime(time);
+ }
+
+ @Override
public void removeJob(String jobId) throws Exception {
this.jobScheduler.remove(jobId);
-
}
- public void removeJobAtScheduledTime(String time) throws IOException {
- // TODO Auto-generated method stub
+ @Override
+ public int getExecutionCount(String jobId) throws Exception {
+ int result = 0;
- }
+ List<Job> jobs = this.jobScheduler.getAllJobs();
+ for (Job job : jobs) {
+ if (job.getJobId().equals(jobId)) {
+ result = job.getExecutionCount();
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index f5745ea..76a7926 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
import javax.management.openmbean.TabularData;
-
-
public interface JobSchedulerViewMBean {
+
/**
- * remove all jobs scheduled to run at this time
+ * Remove all jobs scheduled to run at this time. If there are no jobs scheduled
+ * at the given time this methods returns without making any modifications to the
+ * scheduler store.
+ *
* @param time
- * @throws Exception
+ * the string formated time that should be used to remove jobs.
+ *
+ * @throws Exception if an error occurs while performing the remove.
+ *
+ * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what
+ * the method is actually doing.
*/
+ @Deprecated
@MBeanInfo("remove jobs with matching execution time")
public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
/**
- * remove a job with the matching jobId
+ * Remove all jobs scheduled to run at this time. If there are no jobs scheduled
+ * at the given time this methods returns without making any modifications to the
+ * scheduler store.
+ *
+ * @param time
+ * the string formated time that should be used to remove jobs.
+ *
+ * @throws Exception if an error occurs while performing the remove.
+ */
+ @MBeanInfo("remove jobs with matching execution time")
+ public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
+
+ /**
+ * Remove a job with the matching jobId. If the method does not find a matching job
+ * then it returns without throwing an error or making any modifications to the job
+ * scheduler store.
+ *
* @param jobId
- * @throws Exception
+ * the Job Id to remove from the scheduler store.
+ *
+ * @throws Exception if an error occurs while attempting to remove the Job.
*/
@MBeanInfo("remove jobs with matching jobId")
public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
-
+
/**
- * remove all the Jobs from the scheduler
- * @throws Exception
+ * Remove all the Jobs from the scheduler,
+ *
+ * @throws Exception if an error occurs while purging the store.
*/
@MBeanInfo("remove all scheduled jobs")
public abstract void removeAllJobs() throws Exception;
-
+
/**
- * remove all the Jobs from the scheduler that are due between the start and finish times
- * @param start time
- * @param finish time
- * @throws Exception
+ * Remove all the Jobs from the scheduler that are due between the start and finish times.
+ *
+ * @param start
+ * the starting time to remove jobs from.
+ * @param finish
+ * the finish time for the remove operation.
+ *
+ * @throws Exception if an error occurs while attempting to remove the jobs.
*/
@MBeanInfo("remove all scheduled jobs between time ranges ")
public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
-
-
/**
- * Get the next time jobs will be fired
- * @return the time in milliseconds
- * @throws Exception
+ * Get the next time jobs will be fired from this scheduler store.
+ *
+ * @return the time in milliseconds of the next job to execute.
+ *
+ * @throws Exception if an error occurs while accessing the store.
*/
@MBeanInfo("get the next time a job is due to be scheduled ")
public abstract String getNextScheduleTime() throws Exception;
-
+
+ /**
+ * Gets the number of times a scheduled Job has been executed.
+ *
+ * @return the total number of time a scheduled job has executed.
+ *
+ * @throws Exception if an error occurs while querying for the Job.
+ */
+ @MBeanInfo("get the next time a job is due to be scheduled ")
+ public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception;
+
/**
- * Get all the jobs scheduled to run next
+ * Get all the jobs scheduled to run next.
+ *
* @return a list of jobs that will be scheduled next
- * @throws Exception
+ *
+ * @throws Exception if an error occurs while reading the scheduler store.
*/
@MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
public abstract TabularData getNextScheduleJobs() throws Exception;
-
- /**
- * Get all the outstanding Jobs
- * @return a table of all jobs
- * @throws Exception
+ /**
+ * Get all the outstanding Jobs that are scheduled in this scheduler store.
+ *
+ * @return a table of all jobs in this scheduler store.
+ *
+ * @throws Exception if an error occurs while reading the store.
*/
@MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
public abstract TabularData getAllJobs() throws Exception;
-
+
/**
- * Get all outstanding jobs due to run between start and finish
+ * Get all outstanding jobs due to run between start and finish time range.
+ *
* @param start
+ * the starting time range to query the store for jobs.
* @param finish
- * @return a table of jobs in the range
- * @throws Exception
-
+ * the ending time of this query for scheduled jobs.
+ *
+ * @return a table of jobs in the range given.
+ *
+ * @throws Exception if an error occurs while querying the scheduler store.
*/
@MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
index 7b28a5b..047fe23 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
@@ -16,7 +16,12 @@
*/
package org.apache.activemq.broker.scheduler;
-
+/**
+ * Interface for a scheduled Job object.
+ *
+ * Each Job is identified by a unique Job Id which can be used to reference the Job
+ * in the Job Scheduler store for updates or removal.
+ */
public interface Job {
/**
@@ -38,11 +43,12 @@ public interface Job {
* @return the Delay
*/
public abstract long getDelay();
+
/**
* @return the period
*/
public abstract long getPeriod();
-
+
/**
* @return the cron entry
*/
@@ -52,17 +58,24 @@ public interface Job {
* @return the payload
*/
public abstract byte[] getPayload();
-
+
/**
* Get the start time as a Date time string
* @return the date time
*/
public String getStartTime();
-
+
/**
- * Get the time the job is next due to execute
+ * Get the time the job is next due to execute
* @return the date time
*/
public String getNextExecutionTime();
+ /**
+ * Gets the total number of times this job has executed.
+ *
+ * @returns the number of times this job has been executed.
+ */
+ public int getExecutionCount();
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
index c53d9c6..a453595 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
@@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
import org.apache.activemq.util.ByteSequence;
+/**
+ * Job event listener interface. Provides event points for Job related events
+ * such as job ready events.
+ */
public interface JobListener {
-
+
/**
- * A Job that has been scheduled is now ready
- * @param id
+ * A Job that has been scheduled is now ready to be fired. The Job is passed
+ * in its raw byte form and must be un-marshaled before being delivered.
+ *
+ * @param jobId
+ * The unique Job Id of the Job that is ready to fire.
* @param job
+ * The job that is now ready, delivered in byte form.
*/
- public void scheduledJob(String id,ByteSequence job);
+ public void scheduledJob(String id, ByteSequence job);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
index 2e96eae..e951861 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
@@ -46,20 +46,25 @@ public interface JobScheduler {
void stopDispatching() throws Exception;
/**
- * Add a Job listener
+ * Add a Job listener which will receive events related to scheduled jobs.
+ *
+ * @param listener
+ * The job listener to add.
*
- * @param l
* @throws Exception
*/
- void addListener(JobListener l) throws Exception;
+ void addListener(JobListener listener) throws Exception;
/**
- * remove a JobListener
+ * remove a JobListener that was previously registered. If the given listener is not in
+ * the registry this method has no effect.
+ *
+ * @param listener
+ * The listener that should be removed from the listener registry.
*
- * @param l
* @throws Exception
*/
- void removeListener(JobListener l) throws Exception;
+ void removeListener(JobListener listener) throws Exception;
/**
* Add a job to be scheduled
@@ -70,7 +75,8 @@ public interface JobScheduler {
* the message to be sent when the job is scheduled
* @param delay
* the time in milliseconds before the job will be run
- * @throws Exception
+ *
+ * @throws Exception if an error occurs while scheduling the Job.
*/
void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
@@ -82,8 +88,9 @@ public interface JobScheduler {
* @param payload
* the message to be sent when the job is scheduled
* @param cronEntry
- * - cron entry
- * @throws Exception
+ * The cron entry to use to schedule this job.
+ *
+ * @throws Exception if an error occurs while scheduling the Job.
*/
void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
@@ -95,7 +102,7 @@ public interface JobScheduler {
* @param payload
* the message to be sent when the job is scheduled
* @param cronEntry
- * - cron entry
+ * cron entry
* @param delay
* time in ms to wait before scheduling
* @param period
@@ -110,6 +117,8 @@ public interface JobScheduler {
* remove all jobs scheduled to run at this time
*
* @param time
+ * The UTC time to use to remove a batch of scheduled Jobs.
+ *
* @throws Exception
*/
void remove(long time) throws Exception;
@@ -118,7 +127,9 @@ public interface JobScheduler {
* remove a job with the matching jobId
*
* @param jobId
- * @throws Exception
+ * The unique Job Id to search for and remove from the scheduled set of jobs.
+ *
+ * @throws Exception if an error occurs while removing the Job.
*/
void remove(String jobId) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
index d46d04a..24a216a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
@@ -21,6 +21,12 @@ import java.util.List;
import org.apache.activemq.util.ByteSequence;
+/**
+ * A wrapper for instances of the JobScheduler interface that ensures that methods
+ * provides safe and sane return values and can deal with null values being passed
+ * in etc. Provides a measure of safety when using unknown implementations of the
+ * JobSchedulerStore which might not always do the right thing.
+ */
public class JobSchedulerFacade implements JobScheduler {
private final SchedulerBroker broker;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
index 3cbc367..c6863c7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
@@ -26,13 +26,56 @@ import org.apache.activemq.Service;
*/
public interface JobSchedulerStore extends Service {
+ /**
+ * Gets the location where the Job Scheduler will write the persistent data used
+ * to preserve and recover scheduled Jobs.
+ *
+ * If the scheduler implementation does not utilize a file system based store this
+ * method returns null.
+ *
+ * @return the directory where persistent store data is written.
+ */
File getDirectory();
+ /**
+ * Sets the directory where persistent store data will be written. This method
+ * must be called before the scheduler store is started to have any effect.
+ *
+ * @param directory
+ * The directory where the job scheduler store is to be located.
+ */
void setDirectory(File directory);
+ /**
+ * The size of the current store on disk if the store utilizes a disk based store
+ * mechanism.
+ *
+ * @return the current store size on disk.
+ */
long size();
+ /**
+ * Returns the JobScheduler instance identified by the given name.
+ *
+ * @param name
+ * the name of the JobScheduler instance to lookup.
+ *
+ * @return the named JobScheduler or null if none exists with the given name.
+ *
+ * @throws Exception if an error occurs while loading the named scheduler.
+ */
JobScheduler getJobScheduler(String name) throws Exception;
+ /**
+ * Removes the named JobScheduler if it exists, purging all scheduled messages
+ * assigned to it.
+ *
+ * @param name
+ * the name of the scheduler instance to remove.
+ *
+ * @return true if there was a scheduler with the given name to remove.
+ *
+ * @throws Exception if an error occurs while removing the scheduler.
+ */
boolean removeJobScheduler(String name) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
index 6b78d77..fc5b8dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
@@ -20,7 +20,11 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
+/**
+ * A class to provide common Job Scheduler related methods.
+ */
public class JobSupport {
+
public static String getDateTime(long value) {
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = new Date(value);
@@ -32,5 +36,4 @@ public class JobSupport {
Date date = dfm.parse(value);
return date.getTime();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
index 31efd32..01a9634 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
/**
* Adapter to the actual persistence mechanism used with ActiveMQ
*
- *
+ *
*/
public interface PersistenceAdapter extends Service {
/**
- * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
- * objects that the persistence store is aware exist.
+ * Returns a set of all the
+ * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
+ * persistence store is aware exist.
*
* @return active destinations
*/
Set<ActiveMQDestination> getDestinations();
/**
- * Factory method to create a new queue message store with the given destination name
+ * Factory method to create a new queue message store with the given
+ * destination name
+ *
* @param destination
* @return the message store
- * @throws IOException
+ * @throws IOException
*/
MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
/**
- * Factory method to create a new topic message store with the given destination name
- * @param destination
+ * Factory method to create a new topic message store with the given
+ * destination name
+ *
+ * @param destination
* @return the topic message store
- * @throws IOException
+ * @throws IOException
*/
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
/**
+ * Creates and returns a new Job Scheduler store instance.
+ *
+ * @return a new JobSchedulerStore instance if this Persistence adapter provides its own.
+ *
+ * @throws IOException If an error occurs while creating the new JobSchedulerStore.
+ * @throws UnsupportedOperationException If this adapter does not provide its own
+ * scheduler store implementation.
+ */
+ JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException;
+
+ /**
* Cleanup method to remove any state associated with the given destination.
* This method does not stop the message store (it might not be cached).
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
void removeQueueMessageStore(ActiveMQQueue destination);
/**
* Cleanup method to remove any state associated with the given destination
* This method does not stop the message store (it might not be cached).
- * @param destination Destination to forget
+ *
+ * @param destination
+ * Destination to forget
*/
void removeTopicMessageStore(ActiveMQTopic destination);
/**
- * Factory method to create a new persistent prepared transaction store for XA recovery
+ * Factory method to create a new persistent prepared transaction store for
+ * XA recovery
+ *
* @return transaction store
- * @throws IOException
+ * @throws IOException
*/
TransactionStore createTransactionStore() throws IOException;
/**
- * This method starts a transaction on the persistent storage - which is nothing to
- * do with JMS or XA transactions - its purely a mechanism to perform multiple writes
- * to a persistent store in 1 transaction as a performance optimization.
+ * This method starts a transaction on the persistent storage - which is
+ * nothing to do with JMS or XA transactions - its purely a mechanism to
+ * perform multiple writes to a persistent store in 1 transaction as a
+ * performance optimization.
* <p/>
- * Typically one transaction will require one disk synchronization point and so for
- * real high performance its usually faster to perform many writes within the same
- * transaction to minimize latency caused by disk synchronization. This is especially
- * true when using tools like Berkeley Db or embedded JDBC servers.
- * @param context
- * @throws IOException
+ * Typically one transaction will require one disk synchronization point and
+ * so for real high performance its usually faster to perform many writes
+ * within the same transaction to minimize latency caused by disk
+ * synchronization. This is especially true when using tools like Berkeley
+ * Db or embedded JDBC servers.
+ *
+ * @param context
+ * @throws IOException
*/
void beginTransaction(ConnectionContext context) throws IOException;
-
/**
* Commit a persistence transaction
- * @param context
- * @throws IOException
+ *
+ * @param context
+ * @throws IOException
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
@@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
/**
* Rollback a persistence transaction
- * @param context
- * @throws IOException
+ *
+ * @param context
+ * @throws IOException
*
* @see PersistenceAdapter#beginTransaction(ConnectionContext context)
*/
void rollbackTransaction(ConnectionContext context) throws IOException;
-
+
/**
- *
+ *
* @return last broker sequence
* @throws IOException
*/
long getLastMessageBrokerSequenceId() throws IOException;
-
+
/**
* Delete's all the messages in the persistent store.
- *
+ *
* @throws IOException
*/
void deleteAllMessages() throws IOException;
-
+
/**
- * @param usageManager The UsageManager that is controlling the broker's memory usage.
+ * @param usageManager
+ * The UsageManager that is controlling the broker's memory
+ * usage.
*/
void setUsageManager(SystemUsage usageManager);
-
+
/**
* Set the name of the broker using the adapter
+ *
* @param brokerName
*/
void setBrokerName(String brokerName);
-
+
/**
* Set the directory where any data files should be created
+ *
* @param dir
*/
void setDirectory(File dir);
@@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
* @return the directory used by the persistence adaptor
*/
File getDirectory();
-
+
/**
* checkpoint any
- * @param sync
- * @throws IOException
+ *
+ * @param sync
+ * @throws IOException
*
*/
void checkpoint(boolean sync) throws IOException;
-
+
/**
* A hint to return the size of the store on disk
+ *
* @return disk space used in bytes of 0 if not implemented
*/
long size();
/**
- * return the last stored producer sequenceId for this producer Id
- * used to suppress duplicate sends on failover reconnect at the transport
- * when a reconnect occurs
- * @param id the producerId to find a sequenceId for
+ * return the last stored producer sequenceId for this producer Id used to
+ * suppress duplicate sends on failover reconnect at the transport when a
+ * reconnect occurs
+ *
+ * @param id
+ * the producerId to find a sequenceId for
* @return the last stored sequence id or -1 if no suppression needed
*/
long getLastProducerSequenceId(ProducerId id) throws IOException;
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
index 0fd6bfc..73ea104 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
/**
* @org.apache.xbean.XBean
- *
+ *
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
@@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
private boolean useExternalMessageReferences;
+ @Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
@@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
return new MemoryPersistenceAdapter();
}
+ @Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = queues.get(destination);
if (rc == null) {
@@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
return rc;
}
+ @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = topics.get(destination);
if (rc == null) {
@@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
*
* @param destination Destination to forget
*/
+ @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
@@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
*
* @param destination Destination to forget
*/
+ @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
+ @Override
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore(this);
@@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
return transactionStore;
}
+ @Override
public void beginTransaction(ConnectionContext context) {
}
+ @Override
public void commitTransaction(ConnectionContext context) {
}
+ @Override
public void rollbackTransaction(ConnectionContext context) {
}
+ @Override
public void start() throws Exception {
}
+ @Override
public void stop() throws Exception {
}
+ @Override
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
+ @Override
public void deleteAllMessages() throws IOException {
for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
@@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
+ @Override
public void setUsageManager(SystemUsage usageManager) {
}
+ @Override
public String toString() {
return "MemoryPersistenceAdapter";
}
+ @Override
public void setBrokerName(String brokerName) {
}
+ @Override
public void setDirectory(File dir) {
}
-
+
+ @Override
public File getDirectory(){
return null;
}
+ @Override
public void checkpoint(boolean sync) throws IOException {
}
-
+
+ @Override
public long size(){
return 0;
}
-
+
public void setCreateTransactionStore(boolean create) throws IOException {
if (create) {
createTransactionStore();
}
}
+ @Override
public long getLastProducerSequenceId(ProducerId id) {
// memory map does duplicate suppression
return -1;
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ // We could eventuall implement an in memory scheduler.
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
index a623de9..2a70194 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -61,8 +61,9 @@ public final class IOHelper {
}
/**
- * Converts any string into a string that is safe to use as a file name.
- * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+ * Converts any string into a string that is safe to use as a file name. The
+ * result will only include ascii characters and numbers, and the "-","_",
+ * and "." characters.
*
* @param name
* @return
@@ -76,15 +77,16 @@ public final class IOHelper {
}
/**
- * Converts any string into a string that is safe to use as a file name.
- * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
+ * Converts any string into a string that is safe to use as a file name. The
+ * result will only include ascii characters and numbers, and the "-","_",
+ * and "." characters.
*
* @param name
* @param dirSeparators
* @param maxFileLength
* @return
*/
- public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
+ public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
int size = name.length();
StringBuffer rc = new StringBuffer(size * 2);
for (int i = 0; i < size; i++) {
@@ -92,8 +94,7 @@ public final class IOHelper {
boolean valid = c >= 'a' && c <= 'z';
valid = valid || (c >= 'A' && c <= 'Z');
valid = valid || (c >= '0' && c <= '9');
- valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
- ||(dirSeparators && ( (c == '/') || (c == '\\')));
+ valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\')));
if (valid) {
rc.append(c);
@@ -105,7 +106,7 @@ public final class IOHelper {
}
String result = rc.toString();
if (result.length() > maxFileLength) {
- result = result.substring(result.length()-maxFileLength,result.length());
+ result = result.substring(result.length() - maxFileLength, result.length());
}
return result;
}
@@ -168,8 +169,7 @@ public final class IOHelper {
} else {
for (int i = 0; i < files.length; i++) {
File file = files[i];
- if (file.getName().equals(".")
- || file.getName().equals("..")) {
+ if (file.getName().equals(".") || file.getName().equals("..")) {
continue;
}
if (file.isDirectory()) {
@@ -190,6 +190,27 @@ public final class IOHelper {
}
}
+ public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
+ if (!srcDirectory.isDirectory()) {
+ throw new IOException("source is not a directory");
+ }
+
+ if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
+ throw new IOException("target exists and is not a directory");
+ } else {
+ mkdirs(targetDirectory);
+ }
+
+ List<File> filesToMove = new ArrayList<File>();
+ getFiles(srcDirectory, filesToMove, filter);
+
+ for (File file : filesToMove) {
+ if (!file.isDirectory()) {
+ moveFile(file, targetDirectory);
+ }
+ }
+ }
+
public static void copyFile(File src, File dest) throws IOException {
copyFile(src, dest, null);
}
@@ -222,32 +243,32 @@ public final class IOHelper {
File parent = src.getParentFile();
String fromPath = from.getAbsolutePath();
if (parent.getAbsolutePath().equals(fromPath)) {
- //one level down
+ // one level down
result = to;
- }else {
+ } else {
String parentPath = parent.getAbsolutePath();
String path = parentPath.substring(fromPath.length());
- result = new File(to.getAbsolutePath()+File.separator+path);
+ result = new File(to.getAbsolutePath() + File.separator + path);
}
return result;
}
- static List<File> getFiles(File dir,FilenameFilter filter){
+ static List<File> getFiles(File dir, FilenameFilter filter) {
List<File> result = new ArrayList<File>();
- getFiles(dir,result,filter);
+ getFiles(dir, result, filter);
return result;
}
- static void getFiles(File dir,List<File> list,FilenameFilter filter) {
+ static void getFiles(File dir, List<File> list, FilenameFilter filter) {
if (!list.contains(dir)) {
list.add(dir);
- String[] fileNames=dir.list(filter);
- for (int i =0; i < fileNames.length;i++) {
- File f = new File(dir,fileNames[i]);
+ String[] fileNames = dir.list(filter);
+ for (int i = 0; i < fileNames.length; i++) {
+ File f = new File(dir, fileNames[i]);
if (f.isFile()) {
list.add(f);
- }else {
- getFiles(dir,list,filter);
+ } else {
+ getFiles(dir, list, filter);
}
}
}
@@ -286,12 +307,13 @@ public final class IOHelper {
public static void mkdirs(File dir) throws IOException {
if (dir.exists()) {
if (!dir.isDirectory()) {
- throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
+ throw new IOException("Failed to create directory '" + dir +
+ "', regular file already existed with that name");
}
} else {
if (!dir.mkdirs()) {
- throw new IOException("Failed to create directory '" + dir+"'");
+ throw new IOException("Failed to create directory '" + dir + "'");
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
index 7ff4ae0..a3a8250 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
@@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
this.lockDataSource = dataSource;
}
+ @Override
public BrokerService getBrokerService() {
return brokerService;
}
@@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
}
return result;
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
index 565fc9f..cc5282f 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.activeio.journal.InvalidRecordLocationException;
import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.JournalEventListener;
@@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
* other long term persistent storage.
- *
+ *
* @org.apache.xbean.XBean
- *
+ *
*/
public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
private BrokerService brokerService;
-
+
protected Scheduler scheduler;
private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
@@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private TaskRunnerFactory taskRunnerFactory;
private File directory;
- public JournalPersistenceAdapter() {
+ public JournalPersistenceAdapter() {
}
-
+
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
setJournal(journal);
setTaskRunnerFactory(taskRunnerFactory);
@@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
this.journal = journal;
journal.setJournalEventListener(this);
}
-
+
public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
this.longTermPersistence = longTermPersistence;
}
-
+
final Runnable createPeriodicCheckpointTask() {
return new Runnable() {
+ @Override
public void run() {
long lastTime = 0;
synchronized (this) {
@@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
+ @Override
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
+ @Override
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
destinations.addAll(queues.keySet());
@@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
}
+ @Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
JournalMessageStore store = queues.get(destination);
if (store == null) {
@@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return store;
}
+ @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
JournalTopicMessageStore store = topics.get(destinationName);
if (store == null) {
@@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
*
* @param destination Destination to forget
*/
+ @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
queues.remove(destination);
}
@@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
*
* @param destination Destination to forget
*/
+ @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
topics.remove(destination);
}
+ @Override
public TransactionStore createTransactionStore() throws IOException {
return transactionStore;
}
+ @Override
public long getLastMessageBrokerSequenceId() throws IOException {
return longTermPersistence.getLastMessageBrokerSequenceId();
}
+ @Override
public void beginTransaction(ConnectionContext context) throws IOException {
longTermPersistence.beginTransaction(context);
}
+ @Override
public void commitTransaction(ConnectionContext context) throws IOException {
longTermPersistence.commitTransaction(context);
}
+ @Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
longTermPersistence.rollbackTransaction(context);
}
+ @Override
public synchronized void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
@@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
+ @Override
public boolean iterate() {
return doCheckpoint();
}
}, "ActiveMQ Journal Checkpoint Worker");
checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ @Override
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Journal checkpoint worker");
t.setPriority(7);
@@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
+ @Override
public void stop() throws Exception {
this.usageManager.getMemoryUsage().removeUsageListener(this);
@@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
/**
* The Journal give us a call back so that we can move old data out of the
* journal. Taking a checkpoint does this for us.
- *
+ *
* @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
*/
+ @Override
public void overflowNotification(RecordLocation safeLocation) {
checkpoint(false, true);
}
/**
* When we checkpoint we move all the journalled data to long term storage.
- *
+ *
*/
public void checkpoint(boolean sync, boolean fullCheckpoint) {
try {
@@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
}
+ @Override
public void checkpoint(boolean sync) {
checkpoint(sync, sync);
}
/**
* This does the actual checkpoint.
- *
+ *
* @return
*/
public boolean doCheckpoint() {
@@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
// We do many partial checkpoints (fullCheckpoint==false) to move
// topic messages
// to long term store as soon as possible.
- //
+ //
// We want to avoid doing that for queue messages since removes the
// come in the same
// checkpoint cycle will nullify the previous message add.
@@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
try {
final JournalMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+ @Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
@@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
try {
final JournalTopicMessageStore ms = iterator.next();
FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
+ @Override
public RecordLocation call() throws Exception {
return ms.checkpoint();
}
@@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
/**
* Move all the messages that were in the journal into long term storage. We
* just replay and do a checkpoint.
- *
+ *
* @throws IOException
* @throws IOException
* @throws InvalidRecordLocationException
@@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
if (started.get()) {
try {
- return journal.write(toPacket(wireFormat.marshal(command)), sync);
+ return journal.write(toPacket(wireFormat.marshal(command)), sync);
} catch (IOException ioe) {
- LOG.error("Cannot write to the journal", ioe);
- brokerService.handleIOException(ioe);
- throw ioe;
+ LOG.error("Cannot write to the journal", ioe);
+ brokerService.handleIOException(ioe);
+ throw ioe;
}
}
throw new IOException("closed");
@@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return writeCommand(trace, sync);
}
+ @Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
@@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return transactionStore;
}
+ @Override
public void deleteAllMessages() throws IOException {
try {
JournalTrace trace = new JournalTrace();
@@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
}
+ @Override
public void setBrokerName(String brokerName) {
longTermPersistence.setBrokerName(brokerName);
}
@@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return "JournalPersistenceAdapter(" + longTermPersistence + ")";
}
+ @Override
public void setDirectory(File dir) {
this.directory=dir;
}
-
+
+ @Override
public File getDirectory(){
return directory;
}
-
+
+ @Override
public long size(){
return 0;
}
+ @Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
PersistenceAdapter pa = getLongTermPersistence();
@@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}
}
+ @Override
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ return longTermPersistence.createJobSchedulerStore();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
new file mode 100644
index 0000000..edb2750
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+
+public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
+
+ private int state;
+ private Location lastUpdateLocation;
+ private Page<T> page;
+
+ @Override
+ public Page<T> getPage() {
+ return page;
+ }
+
+ @Override
+ public int getState() {
+ return state;
+ }
+
+ @Override
+ public Location getLastUpdateLocation() {
+ return lastUpdateLocation;
+ }
+
+ @Override
+ public void setPage(Page<T> page) {
+ this.page = page;
+ }
+
+ @Override
+ public void setState(int value) {
+ this.state = value;
+ }
+
+ @Override
+ public void setLastUpdateLocation(Location location) {
+ this.lastUpdateLocation = location;
+ }
+}
Re: [16/16] activemq git commit: https://issues.apache.org/jira/browse/AMQ-3758
Posted by Timothy Bish <ta...@gmail.com>.
-1
This is a huge change that completely alters the Job scheduler store to
use a totally new store format and while the update does allow for the
update from an older to a newer store this not something I would expect
to see in a point release.
On 12/16/2014 05:10 PM, hadrian@apache.org wrote:
> https://issues.apache.org/jira/browse/AMQ-3758
>
> Refactor the scheduler store into a more KahaDB style store that can
> recover from various problems like missing journal files or corruption
> as well as rebuild its index when needed. Move the scheduler store into
> a more configurable style that allows for users to plug in their own
> implementations. Store update from legacy versions is automatic.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
> Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fc244f48
> Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fc244f48
> Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fc244f48
>
> Branch: refs/heads/activemq-5.10.x
> Commit: fc244f48e48596c668a7d9dc3b84c26e60693823
> Parents: db669e4
> Author: Timothy Bish <ta...@gmail.com>
> Authored: Mon Jul 7 12:28:11 2014 -0400
> Committer: Hadrian Zbarcea <ha...@apache.org>
> Committed: Tue Dec 16 16:11:09 2014 -0500
>
> ----------------------------------------------------------------------
> .../apache/activemq/broker/BrokerService.java | 25 +-
> .../activemq/broker/jmx/JobSchedulerView.java | 56 +-
> .../broker/jmx/JobSchedulerViewMBean.java | 113 +-
> .../apache/activemq/broker/scheduler/Job.java | 23 +-
> .../activemq/broker/scheduler/JobListener.java | 16 +-
> .../activemq/broker/scheduler/JobScheduler.java | 33 +-
> .../broker/scheduler/JobSchedulerFacade.java | 6 +
> .../broker/scheduler/JobSchedulerStore.java | 43 +
> .../activemq/broker/scheduler/JobSupport.java | 5 +-
> .../activemq/store/PersistenceAdapter.java | 119 +-
> .../store/memory/MemoryPersistenceAdapter.java | 36 +-
> .../java/org/apache/activemq/util/IOHelper.java | 68 +-
> .../store/jdbc/JDBCPersistenceAdapter.java | 7 +
> .../journal/JournalPersistenceAdapter.java | 71 +-
> .../store/kahadb/AbstractKahaDBMetaData.java | 57 +
> .../store/kahadb/AbstractKahaDBStore.java | 745 ++++++++++++
> .../activemq/store/kahadb/KahaDBMetaData.java | 135 +++
> .../store/kahadb/KahaDBPersistenceAdapter.java | 15 +-
> .../activemq/store/kahadb/KahaDBStore.java | 55 +-
> .../kahadb/MultiKahaDBPersistenceAdapter.java | 56 +-
> .../kahadb/MultiKahaDBTransactionStore.java | 18 +-
> .../activemq/store/kahadb/TempKahaDBStore.java | 138 ++-
> .../apache/activemq/store/kahadb/Visitor.java | 20 +
> .../store/kahadb/scheduler/JobImpl.java | 21 +-
> .../store/kahadb/scheduler/JobLocation.java | 77 +-
> .../scheduler/JobLocationsMarshaller.java | 53 +
> .../kahadb/scheduler/JobSchedulerImpl.java | 837 ++++++++------
> .../scheduler/JobSchedulerKahaDBMetaData.java | 246 ++++
> .../kahadb/scheduler/JobSchedulerStoreImpl.java | 1076 +++++++++++++-----
> .../scheduler/UnknownStoreVersionException.java | 24 +
> .../kahadb/scheduler/legacy/LegacyJobImpl.java | 72 ++
> .../scheduler/legacy/LegacyJobLocation.java | 296 +++++
> .../legacy/LegacyJobSchedulerImpl.java | 222 ++++
> .../legacy/LegacyJobSchedulerStoreImpl.java | 378 ++++++
> .../scheduler/legacy/LegacyStoreReplayer.java | 155 +++
> .../src/main/proto/journal-data.proto | 61 +
> .../apache/activemq/leveldb/LevelDBStore.scala | 5 +
> .../leveldb/replicated/ProxyLevelDBStore.scala | 5 +
> .../JobSchedulerBrokerShutdownTest.java | 1 +
> .../JobSchedulerJmxManagementTests.java | 155 +++
> .../scheduler/JobSchedulerManagementTest.java | 84 +-
> .../JobSchedulerStoreCheckpointTest.java | 125 ++
> .../broker/scheduler/JobSchedulerStoreTest.java | 46 +-
> .../broker/scheduler/JobSchedulerTest.java | 36 +
> .../scheduler/JobSchedulerTestSupport.java | 112 ++
> .../KahaDBSchedulerIndexRebuildTest.java | 179 +++
> .../KahaDBSchedulerMissingJournalLogsTest.java | 204 ++++
> .../scheduler/SchedulerDBVersionTest.java | 164 +++
> .../src/test/resources/log4j.properties | 1 +
> .../activemq/store/schedulerDB/legacy/db-1.log | Bin 0 -> 524288 bytes
> .../store/schedulerDB/legacy/scheduleDB.data | Bin 0 -> 20480 bytes
> .../store/schedulerDB/legacy/scheduleDB.redo | Bin 0 -> 16408 bytes
> 52 files changed, 5584 insertions(+), 911 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> index 00d4abd..5becec2 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> @@ -1861,6 +1861,23 @@ public class BrokerService implements Service {
>
> try {
> PersistenceAdapter pa = getPersistenceAdapter();
> + if (pa != null) {
> + this.jobSchedulerStore = pa.createJobSchedulerStore();
> + jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
> + configureService(jobSchedulerStore);
> + jobSchedulerStore.start();
> + return this.jobSchedulerStore;
> + }
> + } catch (IOException e) {
> + throw new RuntimeException(e);
> + } catch (UnsupportedOperationException ex) {
> + // It's ok if the store doesn't implement a scheduler.
> + } catch (Exception e) {
> + throw new RuntimeException(e);
> + }
> +
> + try {
> + PersistenceAdapter pa = getPersistenceAdapter();
> if (pa != null && pa instanceof JobSchedulerStore) {
> this.jobSchedulerStore = (JobSchedulerStore) pa;
> configureService(jobSchedulerStore);
> @@ -1870,9 +1887,13 @@ public class BrokerService implements Service {
> throw new RuntimeException(e);
> }
>
> + // Load the KahaDB store as a last resort, this only works if KahaDB is
> + // included at runtime, otherwise this will fail. User should disable
> + // scheduler support if this fails.
> try {
> - String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
> - jobSchedulerStore = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
> + String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
> + PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
> + jobSchedulerStore = adaptor.createJobSchedulerStore();
> jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
> configureService(jobSchedulerStore);
> jobSchedulerStore.start();
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> index 9e5a1fb..2118a96 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
> @@ -16,23 +16,39 @@
> */
> package org.apache.activemq.broker.jmx;
>
> +import java.util.List;
> +
> +import javax.management.openmbean.CompositeDataSupport;
> +import javax.management.openmbean.CompositeType;
> +import javax.management.openmbean.TabularData;
> +import javax.management.openmbean.TabularDataSupport;
> +import javax.management.openmbean.TabularType;
> +
> import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
> import org.apache.activemq.broker.scheduler.Job;
> import org.apache.activemq.broker.scheduler.JobScheduler;
> import org.apache.activemq.broker.scheduler.JobSupport;
>
> -import javax.management.openmbean.*;
> -import java.io.IOException;
> -import java.util.List;
> -
> +/**
> + * MBean object that can be used to manage a single instance of a JobScheduler. The object
> + * provides methods for querying for jobs and removing some or all of the jobs that are
> + * scheduled in the managed store.
> + */
> public class JobSchedulerView implements JobSchedulerViewMBean {
>
> private final JobScheduler jobScheduler;
>
> + /**
> + * Creates a new instance of the JobScheduler management MBean.
> + *
> + * @param jobScheduler
> + * The scheduler instance to manage.
> + */
> public JobSchedulerView(JobScheduler jobScheduler) {
> this.jobScheduler = jobScheduler;
> }
>
> + @Override
> public TabularData getAllJobs() throws Exception {
> OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
> CompositeType ct = factory.getCompositeType();
> @@ -45,6 +61,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
> return rc;
> }
>
> + @Override
> public TabularData getAllJobs(String startTime, String finishTime) throws Exception {
> OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
> CompositeType ct = factory.getCompositeType();
> @@ -59,6 +76,7 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
> return rc;
> }
>
> + @Override
> public TabularData getNextScheduleJobs() throws Exception {
> OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
> CompositeType ct = factory.getCompositeType();
> @@ -71,31 +89,51 @@ public class JobSchedulerView implements JobSchedulerViewMBean {
> return rc;
> }
>
> + @Override
> public String getNextScheduleTime() throws Exception {
> long time = this.jobScheduler.getNextScheduleTime();
> return JobSupport.getDateTime(time);
> }
>
> + @Override
> public void removeAllJobs() throws Exception {
> this.jobScheduler.removeAllJobs();
> -
> }
>
> + @Override
> public void removeAllJobs(String startTime, String finishTime) throws Exception {
> long start = JobSupport.getDataTime(startTime);
> long finish = JobSupport.getDataTime(finishTime);
> this.jobScheduler.removeAllJobs(start, finish);
> + }
>
> + @Override
> + public void removeAllJobsAtScheduledTime(String time) throws Exception {
> + long removeAtTime = JobSupport.getDataTime(time);
> + this.jobScheduler.remove(removeAtTime);
> }
>
> + @Override
> + public void removeJobAtScheduledTime(String time) throws Exception {
> + removeAllJobsAtScheduledTime(time);
> + }
> +
> + @Override
> public void removeJob(String jobId) throws Exception {
> this.jobScheduler.remove(jobId);
> -
> }
>
> - public void removeJobAtScheduledTime(String time) throws IOException {
> - // TODO Auto-generated method stub
> + @Override
> + public int getExecutionCount(String jobId) throws Exception {
> + int result = 0;
>
> - }
> + List<Job> jobs = this.jobScheduler.getAllJobs();
> + for (Job job : jobs) {
> + if (job.getJobId().equals(jobId)) {
> + result = job.getExecutionCount();
> + }
> + }
>
> + return result;
> + }
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> index f5745ea..76a7926 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
> @@ -18,76 +18,125 @@ package org.apache.activemq.broker.jmx;
>
> import javax.management.openmbean.TabularData;
>
> -
> -
> public interface JobSchedulerViewMBean {
> +
> /**
> - * remove all jobs scheduled to run at this time
> + * Remove all jobs scheduled to run at this time. If there are no jobs scheduled
> + * at the given time this methods returns without making any modifications to the
> + * scheduler store.
> + *
> * @param time
> - * @throws Exception
> + * the string formated time that should be used to remove jobs.
> + *
> + * @throws Exception if an error occurs while performing the remove.
> + *
> + * @deprecated use removeAllJobsAtScheduledTime instead as it is more explicit about what
> + * the method is actually doing.
> */
> + @Deprecated
> @MBeanInfo("remove jobs with matching execution time")
> public abstract void removeJobAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
>
> /**
> - * remove a job with the matching jobId
> + * Remove all jobs scheduled to run at this time. If there are no jobs scheduled
> + * at the given time this methods returns without making any modifications to the
> + * scheduler store.
> + *
> + * @param time
> + * the string formated time that should be used to remove jobs.
> + *
> + * @throws Exception if an error occurs while performing the remove.
> + */
> + @MBeanInfo("remove jobs with matching execution time")
> + public abstract void removeAllJobsAtScheduledTime(@MBeanInfo("time: yyyy-MM-dd hh:mm:ss")String time) throws Exception;
> +
> + /**
> + * Remove a job with the matching jobId. If the method does not find a matching job
> + * then it returns without throwing an error or making any modifications to the job
> + * scheduler store.
> + *
> * @param jobId
> - * @throws Exception
> + * the Job Id to remove from the scheduler store.
> + *
> + * @throws Exception if an error occurs while attempting to remove the Job.
> */
> @MBeanInfo("remove jobs with matching jobId")
> public abstract void removeJob(@MBeanInfo("jobId")String jobId) throws Exception;
> -
> +
> /**
> - * remove all the Jobs from the scheduler
> - * @throws Exception
> + * Remove all the Jobs from the scheduler,
> + *
> + * @throws Exception if an error occurs while purging the store.
> */
> @MBeanInfo("remove all scheduled jobs")
> public abstract void removeAllJobs() throws Exception;
> -
> +
> /**
> - * remove all the Jobs from the scheduler that are due between the start and finish times
> - * @param start time
> - * @param finish time
> - * @throws Exception
> + * Remove all the Jobs from the scheduler that are due between the start and finish times.
> + *
> + * @param start
> + * the starting time to remove jobs from.
> + * @param finish
> + * the finish time for the remove operation.
> + *
> + * @throws Exception if an error occurs while attempting to remove the jobs.
> */
> @MBeanInfo("remove all scheduled jobs between time ranges ")
> public abstract void removeAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish) throws Exception;
> -
>
> -
> /**
> - * Get the next time jobs will be fired
> - * @return the time in milliseconds
> - * @throws Exception
> + * Get the next time jobs will be fired from this scheduler store.
> + *
> + * @return the time in milliseconds of the next job to execute.
> + *
> + * @throws Exception if an error occurs while accessing the store.
> */
> @MBeanInfo("get the next time a job is due to be scheduled ")
> public abstract String getNextScheduleTime() throws Exception;
> -
> +
> + /**
> + * Gets the number of times a scheduled Job has been executed.
> + *
> + * @return the total number of time a scheduled job has executed.
> + *
> + * @throws Exception if an error occurs while querying for the Job.
> + */
> + @MBeanInfo("get the next time a job is due to be scheduled ")
> + public abstract int getExecutionCount(@MBeanInfo("jobId")String jobId) throws Exception;
> +
> /**
> - * Get all the jobs scheduled to run next
> + * Get all the jobs scheduled to run next.
> + *
> * @return a list of jobs that will be scheduled next
> - * @throws Exception
> + *
> + * @throws Exception if an error occurs while reading the scheduler store.
> */
> @MBeanInfo("get the next job(s) to be scheduled. Not HTML friendly ")
> public abstract TabularData getNextScheduleJobs() throws Exception;
> -
> - /**
> - * Get all the outstanding Jobs
> - * @return a table of all jobs
> - * @throws Exception
>
> + /**
> + * Get all the outstanding Jobs that are scheduled in this scheduler store.
> + *
> + * @return a table of all jobs in this scheduler store.
> + *
> + * @throws Exception if an error occurs while reading the store.
> */
> @MBeanInfo("get the scheduled Jobs in the Store. Not HTML friendly ")
> public abstract TabularData getAllJobs() throws Exception;
> -
> +
> /**
> - * Get all outstanding jobs due to run between start and finish
> + * Get all outstanding jobs due to run between start and finish time range.
> + *
> * @param start
> + * the starting time range to query the store for jobs.
> * @param finish
> - * @return a table of jobs in the range
> - * @throws Exception
> -
> + * the ending time of this query for scheduled jobs.
> + *
> + * @return a table of jobs in the range given.
> + *
> + * @throws Exception if an error occurs while querying the scheduler store.
> */
> @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
> public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
> +
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> index 7b28a5b..047fe23 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/Job.java
> @@ -16,7 +16,12 @@
> */
> package org.apache.activemq.broker.scheduler;
>
> -
> +/**
> + * Interface for a scheduled Job object.
> + *
> + * Each Job is identified by a unique Job Id which can be used to reference the Job
> + * in the Job Scheduler store for updates or removal.
> + */
> public interface Job {
>
> /**
> @@ -38,11 +43,12 @@ public interface Job {
> * @return the Delay
> */
> public abstract long getDelay();
> +
> /**
> * @return the period
> */
> public abstract long getPeriod();
> -
> +
> /**
> * @return the cron entry
> */
> @@ -52,17 +58,24 @@ public interface Job {
> * @return the payload
> */
> public abstract byte[] getPayload();
> -
> +
> /**
> * Get the start time as a Date time string
> * @return the date time
> */
> public String getStartTime();
> -
> +
> /**
> - * Get the time the job is next due to execute
> + * Get the time the job is next due to execute
> * @return the date time
> */
> public String getNextExecutionTime();
>
> + /**
> + * Gets the total number of times this job has executed.
> + *
> + * @returns the number of times this job has been executed.
> + */
> + public int getExecutionCount();
> +
> }
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> index c53d9c6..a453595 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobListener.java
> @@ -18,13 +18,21 @@ package org.apache.activemq.broker.scheduler;
>
> import org.apache.activemq.util.ByteSequence;
>
> +/**
> + * Job event listener interface. Provides event points for Job related events
> + * such as job ready events.
> + */
> public interface JobListener {
> -
> +
> /**
> - * A Job that has been scheduled is now ready
> - * @param id
> + * A Job that has been scheduled is now ready to be fired. The Job is passed
> + * in its raw byte form and must be un-marshaled before being delivered.
> + *
> + * @param jobId
> + * The unique Job Id of the Job that is ready to fire.
> * @param job
> + * The job that is now ready, delivered in byte form.
> */
> - public void scheduledJob(String id,ByteSequence job);
> + public void scheduledJob(String id, ByteSequence job);
>
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> index 2e96eae..e951861 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobScheduler.java
> @@ -46,20 +46,25 @@ public interface JobScheduler {
> void stopDispatching() throws Exception;
>
> /**
> - * Add a Job listener
> + * Add a Job listener which will receive events related to scheduled jobs.
> + *
> + * @param listener
> + * The job listener to add.
> *
> - * @param l
> * @throws Exception
> */
> - void addListener(JobListener l) throws Exception;
> + void addListener(JobListener listener) throws Exception;
>
> /**
> - * remove a JobListener
> + * remove a JobListener that was previously registered. If the given listener is not in
> + * the registry this method has no effect.
> + *
> + * @param listener
> + * The listener that should be removed from the listener registry.
> *
> - * @param l
> * @throws Exception
> */
> - void removeListener(JobListener l) throws Exception;
> + void removeListener(JobListener listener) throws Exception;
>
> /**
> * Add a job to be scheduled
> @@ -70,7 +75,8 @@ public interface JobScheduler {
> * the message to be sent when the job is scheduled
> * @param delay
> * the time in milliseconds before the job will be run
> - * @throws Exception
> + *
> + * @throws Exception if an error occurs while scheduling the Job.
> */
> void schedule(String jobId, ByteSequence payload, long delay) throws Exception;
>
> @@ -82,8 +88,9 @@ public interface JobScheduler {
> * @param payload
> * the message to be sent when the job is scheduled
> * @param cronEntry
> - * - cron entry
> - * @throws Exception
> + * The cron entry to use to schedule this job.
> + *
> + * @throws Exception if an error occurs while scheduling the Job.
> */
> void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception;
>
> @@ -95,7 +102,7 @@ public interface JobScheduler {
> * @param payload
> * the message to be sent when the job is scheduled
> * @param cronEntry
> - * - cron entry
> + * cron entry
> * @param delay
> * time in ms to wait before scheduling
> * @param period
> @@ -110,6 +117,8 @@ public interface JobScheduler {
> * remove all jobs scheduled to run at this time
> *
> * @param time
> + * The UTC time to use to remove a batch of scheduled Jobs.
> + *
> * @throws Exception
> */
> void remove(long time) throws Exception;
> @@ -118,7 +127,9 @@ public interface JobScheduler {
> * remove a job with the matching jobId
> *
> * @param jobId
> - * @throws Exception
> + * The unique Job Id to search for and remove from the scheduled set of jobs.
> + *
> + * @throws Exception if an error occurs while removing the Job.
> */
> void remove(String jobId) throws Exception;
>
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> index d46d04a..24a216a 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerFacade.java
> @@ -21,6 +21,12 @@ import java.util.List;
>
> import org.apache.activemq.util.ByteSequence;
>
> +/**
> + * A wrapper for instances of the JobScheduler interface that ensures that methods
> + * provides safe and sane return values and can deal with null values being passed
> + * in etc. Provides a measure of safety when using unknown implementations of the
> + * JobSchedulerStore which might not always do the right thing.
> + */
> public class JobSchedulerFacade implements JobScheduler {
>
> private final SchedulerBroker broker;
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> index 3cbc367..c6863c7 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
> @@ -26,13 +26,56 @@ import org.apache.activemq.Service;
> */
> public interface JobSchedulerStore extends Service {
>
> + /**
> + * Gets the location where the Job Scheduler will write the persistent data used
> + * to preserve and recover scheduled Jobs.
> + *
> + * If the scheduler implementation does not utilize a file system based store this
> + * method returns null.
> + *
> + * @return the directory where persistent store data is written.
> + */
> File getDirectory();
>
> + /**
> + * Sets the directory where persistent store data will be written. This method
> + * must be called before the scheduler store is started to have any effect.
> + *
> + * @param directory
> + * The directory where the job scheduler store is to be located.
> + */
> void setDirectory(File directory);
>
> + /**
> + * The size of the current store on disk if the store utilizes a disk based store
> + * mechanism.
> + *
> + * @return the current store size on disk.
> + */
> long size();
>
> + /**
> + * Returns the JobScheduler instance identified by the given name.
> + *
> + * @param name
> + * the name of the JobScheduler instance to lookup.
> + *
> + * @return the named JobScheduler or null if none exists with the given name.
> + *
> + * @throws Exception if an error occurs while loading the named scheduler.
> + */
> JobScheduler getJobScheduler(String name) throws Exception;
>
> + /**
> + * Removes the named JobScheduler if it exists, purging all scheduled messages
> + * assigned to it.
> + *
> + * @param name
> + * the name of the scheduler instance to remove.
> + *
> + * @return true if there was a scheduler with the given name to remove.
> + *
> + * @throws Exception if an error occurs while removing the scheduler.
> + */
> boolean removeJobScheduler(String name) throws Exception;
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> index 6b78d77..fc5b8dd 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
> @@ -20,7 +20,11 @@ import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
>
> +/**
> + * A class to provide common Job Scheduler related methods.
> + */
> public class JobSupport {
> +
> public static String getDateTime(long value) {
> DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
> Date date = new Date(value);
> @@ -32,5 +36,4 @@ public class JobSupport {
> Date date = dfm.parse(value);
> return date.getTime();
> }
> -
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> index 31efd32..01a9634 100755
> --- a/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/store/PersistenceAdapter.java
> @@ -22,6 +22,7 @@ import java.util.Set;
>
> import org.apache.activemq.Service;
> import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
> @@ -31,74 +32,99 @@ import org.apache.activemq.usage.SystemUsage;
> /**
> * Adapter to the actual persistence mechanism used with ActiveMQ
> *
> - *
> + *
> */
> public interface PersistenceAdapter extends Service {
>
> /**
> - * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
> - * objects that the persistence store is aware exist.
> + * Returns a set of all the
> + * {@link org.apache.activemq.command.ActiveMQDestination} objects that the
> + * persistence store is aware exist.
> *
> * @return active destinations
> */
> Set<ActiveMQDestination> getDestinations();
>
> /**
> - * Factory method to create a new queue message store with the given destination name
> + * Factory method to create a new queue message store with the given
> + * destination name
> + *
> * @param destination
> * @return the message store
> - * @throws IOException
> + * @throws IOException
> */
> MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
>
> /**
> - * Factory method to create a new topic message store with the given destination name
> - * @param destination
> + * Factory method to create a new topic message store with the given
> + * destination name
> + *
> + * @param destination
> * @return the topic message store
> - * @throws IOException
> + * @throws IOException
> */
> TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
>
> /**
> + * Creates and returns a new Job Scheduler store instance.
> + *
> + * @return a new JobSchedulerStore instance if this Persistence adapter provides its own.
> + *
> + * @throws IOException If an error occurs while creating the new JobSchedulerStore.
> + * @throws UnsupportedOperationException If this adapter does not provide its own
> + * scheduler store implementation.
> + */
> + JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException;
> +
> + /**
> * Cleanup method to remove any state associated with the given destination.
> * This method does not stop the message store (it might not be cached).
> - * @param destination Destination to forget
> + *
> + * @param destination
> + * Destination to forget
> */
> void removeQueueMessageStore(ActiveMQQueue destination);
>
> /**
> * Cleanup method to remove any state associated with the given destination
> * This method does not stop the message store (it might not be cached).
> - * @param destination Destination to forget
> + *
> + * @param destination
> + * Destination to forget
> */
> void removeTopicMessageStore(ActiveMQTopic destination);
>
> /**
> - * Factory method to create a new persistent prepared transaction store for XA recovery
> + * Factory method to create a new persistent prepared transaction store for
> + * XA recovery
> + *
> * @return transaction store
> - * @throws IOException
> + * @throws IOException
> */
> TransactionStore createTransactionStore() throws IOException;
>
> /**
> - * This method starts a transaction on the persistent storage - which is nothing to
> - * do with JMS or XA transactions - its purely a mechanism to perform multiple writes
> - * to a persistent store in 1 transaction as a performance optimization.
> + * This method starts a transaction on the persistent storage - which is
> + * nothing to do with JMS or XA transactions - its purely a mechanism to
> + * perform multiple writes to a persistent store in 1 transaction as a
> + * performance optimization.
> * <p/>
> - * Typically one transaction will require one disk synchronization point and so for
> - * real high performance its usually faster to perform many writes within the same
> - * transaction to minimize latency caused by disk synchronization. This is especially
> - * true when using tools like Berkeley Db or embedded JDBC servers.
> - * @param context
> - * @throws IOException
> + * Typically one transaction will require one disk synchronization point and
> + * so for real high performance its usually faster to perform many writes
> + * within the same transaction to minimize latency caused by disk
> + * synchronization. This is especially true when using tools like Berkeley
> + * Db or embedded JDBC servers.
> + *
> + * @param context
> + * @throws IOException
> */
> void beginTransaction(ConnectionContext context) throws IOException;
>
> -
> /**
> * Commit a persistence transaction
> - * @param context
> - * @throws IOException
> + *
> + * @param context
> + * @throws IOException
> *
> * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
> */
> @@ -106,40 +132,45 @@ public interface PersistenceAdapter extends Service {
>
> /**
> * Rollback a persistence transaction
> - * @param context
> - * @throws IOException
> + *
> + * @param context
> + * @throws IOException
> *
> * @see PersistenceAdapter#beginTransaction(ConnectionContext context)
> */
> void rollbackTransaction(ConnectionContext context) throws IOException;
> -
> +
> /**
> - *
> + *
> * @return last broker sequence
> * @throws IOException
> */
> long getLastMessageBrokerSequenceId() throws IOException;
> -
> +
> /**
> * Delete's all the messages in the persistent store.
> - *
> + *
> * @throws IOException
> */
> void deleteAllMessages() throws IOException;
> -
> +
> /**
> - * @param usageManager The UsageManager that is controlling the broker's memory usage.
> + * @param usageManager
> + * The UsageManager that is controlling the broker's memory
> + * usage.
> */
> void setUsageManager(SystemUsage usageManager);
> -
> +
> /**
> * Set the name of the broker using the adapter
> + *
> * @param brokerName
> */
> void setBrokerName(String brokerName);
> -
> +
> /**
> * Set the directory where any data files should be created
> + *
> * @param dir
> */
> void setDirectory(File dir);
> @@ -148,26 +179,30 @@ public interface PersistenceAdapter extends Service {
> * @return the directory used by the persistence adaptor
> */
> File getDirectory();
> -
> +
> /**
> * checkpoint any
> - * @param sync
> - * @throws IOException
> + *
> + * @param sync
> + * @throws IOException
> *
> */
> void checkpoint(boolean sync) throws IOException;
> -
> +
> /**
> * A hint to return the size of the store on disk
> + *
> * @return disk space used in bytes of 0 if not implemented
> */
> long size();
>
> /**
> - * return the last stored producer sequenceId for this producer Id
> - * used to suppress duplicate sends on failover reconnect at the transport
> - * when a reconnect occurs
> - * @param id the producerId to find a sequenceId for
> + * return the last stored producer sequenceId for this producer Id used to
> + * suppress duplicate sends on failover reconnect at the transport when a
> + * reconnect occurs
> + *
> + * @param id
> + * the producerId to find a sequenceId for
> * @return the last stored sequence id or -1 if no suppression needed
> */
> long getLastProducerSequenceId(ProducerId id) throws IOException;
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> index 0fd6bfc..73ea104 100755
> --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
> @@ -24,6 +24,7 @@ import java.util.Set;
> import java.util.concurrent.ConcurrentHashMap;
>
> import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
> @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory;
>
> /**
> * @org.apache.xbean.XBean
> - *
> + *
> */
> public class MemoryPersistenceAdapter implements PersistenceAdapter {
> private static final Logger LOG = LoggerFactory.getLogger(MemoryPersistenceAdapter.class);
> @@ -49,6 +50,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> ConcurrentHashMap<ActiveMQDestination, MessageStore> queues = new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
> private boolean useExternalMessageReferences;
>
> + @Override
> public Set<ActiveMQDestination> getDestinations() {
> Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(queues.size() + topics.size());
> for (Iterator<ActiveMQDestination> iter = queues.keySet().iterator(); iter.hasNext();) {
> @@ -64,6 +66,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> return new MemoryPersistenceAdapter();
> }
>
> + @Override
> public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
> MessageStore rc = queues.get(destination);
> if (rc == null) {
> @@ -76,6 +79,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> return rc;
> }
>
> + @Override
> public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
> TopicMessageStore rc = topics.get(destination);
> if (rc == null) {
> @@ -93,6 +97,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> *
> * @param destination Destination to forget
> */
> + @Override
> public void removeQueueMessageStore(ActiveMQQueue destination) {
> queues.remove(destination);
> }
> @@ -102,10 +107,12 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> *
> * @param destination Destination to forget
> */
> + @Override
> public void removeTopicMessageStore(ActiveMQTopic destination) {
> topics.remove(destination);
> }
>
> + @Override
> public TransactionStore createTransactionStore() throws IOException {
> if (transactionStore == null) {
> transactionStore = new MemoryTransactionStore(this);
> @@ -113,25 +120,32 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> return transactionStore;
> }
>
> + @Override
> public void beginTransaction(ConnectionContext context) {
> }
>
> + @Override
> public void commitTransaction(ConnectionContext context) {
> }
>
> + @Override
> public void rollbackTransaction(ConnectionContext context) {
> }
>
> + @Override
> public void start() throws Exception {
> }
>
> + @Override
> public void stop() throws Exception {
> }
>
> + @Override
> public long getLastMessageBrokerSequenceId() throws IOException {
> return 0;
> }
>
> + @Override
> public void deleteAllMessages() throws IOException {
> for (Iterator<TopicMessageStore> iter = topics.values().iterator(); iter.hasNext();) {
> MemoryMessageStore store = asMemoryMessageStore(iter.next());
> @@ -177,38 +191,52 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
> * @param usageManager The UsageManager that is controlling the broker's
> * memory usage.
> */
> + @Override
> public void setUsageManager(SystemUsage usageManager) {
> }
>
> + @Override
> public String toString() {
> return "MemoryPersistenceAdapter";
> }
>
> + @Override
> public void setBrokerName(String brokerName) {
> }
>
> + @Override
> public void setDirectory(File dir) {
> }
> -
> +
> + @Override
> public File getDirectory(){
> return null;
> }
>
> + @Override
> public void checkpoint(boolean sync) throws IOException {
> }
> -
> +
> + @Override
> public long size(){
> return 0;
> }
> -
> +
> public void setCreateTransactionStore(boolean create) throws IOException {
> if (create) {
> createTransactionStore();
> }
> }
>
> + @Override
> public long getLastProducerSequenceId(ProducerId id) {
> // memory map does duplicate suppression
> return -1;
> }
> +
> + @Override
> + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> + // We could eventuall implement an in memory scheduler.
> + throw new UnsupportedOperationException();
> + }
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> ----------------------------------------------------------------------
> diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> index a623de9..2a70194 100644
> --- a/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> +++ b/activemq-broker/src/main/java/org/apache/activemq/util/IOHelper.java
> @@ -61,8 +61,9 @@ public final class IOHelper {
> }
>
> /**
> - * Converts any string into a string that is safe to use as a file name.
> - * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
> + * Converts any string into a string that is safe to use as a file name. The
> + * result will only include ascii characters and numbers, and the "-","_",
> + * and "." characters.
> *
> * @param name
> * @return
> @@ -76,15 +77,16 @@ public final class IOHelper {
> }
>
> /**
> - * Converts any string into a string that is safe to use as a file name.
> - * The result will only include ascii characters and numbers, and the "-","_", and "." characters.
> + * Converts any string into a string that is safe to use as a file name. The
> + * result will only include ascii characters and numbers, and the "-","_",
> + * and "." characters.
> *
> * @param name
> * @param dirSeparators
> * @param maxFileLength
> * @return
> */
> - public static String toFileSystemSafeName(String name,boolean dirSeparators,int maxFileLength) {
> + public static String toFileSystemSafeName(String name, boolean dirSeparators, int maxFileLength) {
> int size = name.length();
> StringBuffer rc = new StringBuffer(size * 2);
> for (int i = 0; i < size; i++) {
> @@ -92,8 +94,7 @@ public final class IOHelper {
> boolean valid = c >= 'a' && c <= 'z';
> valid = valid || (c >= 'A' && c <= 'Z');
> valid = valid || (c >= '0' && c <= '9');
> - valid = valid || (c == '_') || (c == '-') || (c == '.') || (c=='#')
> - ||(dirSeparators && ( (c == '/') || (c == '\\')));
> + valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '#') || (dirSeparators && ((c == '/') || (c == '\\')));
>
> if (valid) {
> rc.append(c);
> @@ -105,7 +106,7 @@ public final class IOHelper {
> }
> String result = rc.toString();
> if (result.length() > maxFileLength) {
> - result = result.substring(result.length()-maxFileLength,result.length());
> + result = result.substring(result.length() - maxFileLength, result.length());
> }
> return result;
> }
> @@ -168,8 +169,7 @@ public final class IOHelper {
> } else {
> for (int i = 0; i < files.length; i++) {
> File file = files[i];
> - if (file.getName().equals(".")
> - || file.getName().equals("..")) {
> + if (file.getName().equals(".") || file.getName().equals("..")) {
> continue;
> }
> if (file.isDirectory()) {
> @@ -190,6 +190,27 @@ public final class IOHelper {
> }
> }
>
> + public static void moveFiles(File srcDirectory, File targetDirectory, FilenameFilter filter) throws IOException {
> + if (!srcDirectory.isDirectory()) {
> + throw new IOException("source is not a directory");
> + }
> +
> + if (targetDirectory.exists() && !targetDirectory.isDirectory()) {
> + throw new IOException("target exists and is not a directory");
> + } else {
> + mkdirs(targetDirectory);
> + }
> +
> + List<File> filesToMove = new ArrayList<File>();
> + getFiles(srcDirectory, filesToMove, filter);
> +
> + for (File file : filesToMove) {
> + if (!file.isDirectory()) {
> + moveFile(file, targetDirectory);
> + }
> + }
> + }
> +
> public static void copyFile(File src, File dest) throws IOException {
> copyFile(src, dest, null);
> }
> @@ -222,32 +243,32 @@ public final class IOHelper {
> File parent = src.getParentFile();
> String fromPath = from.getAbsolutePath();
> if (parent.getAbsolutePath().equals(fromPath)) {
> - //one level down
> + // one level down
> result = to;
> - }else {
> + } else {
> String parentPath = parent.getAbsolutePath();
> String path = parentPath.substring(fromPath.length());
> - result = new File(to.getAbsolutePath()+File.separator+path);
> + result = new File(to.getAbsolutePath() + File.separator + path);
> }
> return result;
> }
>
> - static List<File> getFiles(File dir,FilenameFilter filter){
> + static List<File> getFiles(File dir, FilenameFilter filter) {
> List<File> result = new ArrayList<File>();
> - getFiles(dir,result,filter);
> + getFiles(dir, result, filter);
> return result;
> }
>
> - static void getFiles(File dir,List<File> list,FilenameFilter filter) {
> + static void getFiles(File dir, List<File> list, FilenameFilter filter) {
> if (!list.contains(dir)) {
> list.add(dir);
> - String[] fileNames=dir.list(filter);
> - for (int i =0; i < fileNames.length;i++) {
> - File f = new File(dir,fileNames[i]);
> + String[] fileNames = dir.list(filter);
> + for (int i = 0; i < fileNames.length; i++) {
> + File f = new File(dir, fileNames[i]);
> if (f.isFile()) {
> list.add(f);
> - }else {
> - getFiles(dir,list,filter);
> + } else {
> + getFiles(dir, list, filter);
> }
> }
> }
> @@ -286,12 +307,13 @@ public final class IOHelper {
> public static void mkdirs(File dir) throws IOException {
> if (dir.exists()) {
> if (!dir.isDirectory()) {
> - throw new IOException("Failed to create directory '" + dir +"', regular file already existed with that name");
> + throw new IOException("Failed to create directory '" + dir +
> + "', regular file already existed with that name");
> }
>
> } else {
> if (!dir.mkdirs()) {
> - throw new IOException("Failed to create directory '" + dir+"'");
> + throw new IOException("Failed to create directory '" + dir + "'");
> }
> }
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> index 7ff4ae0..a3a8250 100755
> --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
> @@ -34,6 +34,7 @@ import org.apache.activemq.ActiveMQMessageAudit;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.ConnectionContext;
> import org.apache.activemq.broker.Locker;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
> @@ -422,6 +423,7 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
> this.lockDataSource = dataSource;
> }
>
> + @Override
> public BrokerService getBrokerService() {
> return brokerService;
> }
> @@ -846,4 +848,9 @@ public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements
> }
> return result;
> }
> +
> + @Override
> + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> + throw new UnsupportedOperationException();
> + }
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> ----------------------------------------------------------------------
> diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> index 565fc9f..cc5282f 100755
> --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java
> @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadFactory;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicBoolean;
> +
> import org.apache.activeio.journal.InvalidRecordLocationException;
> import org.apache.activeio.journal.Journal;
> import org.apache.activeio.journal.JournalEventListener;
> @@ -40,6 +41,7 @@ import org.apache.activeio.packet.Packet;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.broker.BrokerServiceAware;
> import org.apache.activemq.broker.ConnectionContext;
> +import org.apache.activemq.broker.scheduler.JobSchedulerStore;
> import org.apache.activemq.command.ActiveMQDestination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
> @@ -78,14 +80,14 @@ import org.slf4j.LoggerFactory;
> * An implementation of {@link PersistenceAdapter} designed for use with a
> * {@link Journal} and then check pointing asynchronously on a timeout with some
> * other long term persistent storage.
> - *
> + *
> * @org.apache.xbean.XBean
> - *
> + *
> */
> public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener, UsageListener, BrokerServiceAware {
>
> private BrokerService brokerService;
> -
> +
> protected Scheduler scheduler;
> private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapter.class);
>
> @@ -118,9 +120,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> private TaskRunnerFactory taskRunnerFactory;
> private File directory;
>
> - public JournalPersistenceAdapter() {
> + public JournalPersistenceAdapter() {
> }
> -
> +
> public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
> setJournal(journal);
> setTaskRunnerFactory(taskRunnerFactory);
> @@ -135,13 +137,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> this.journal = journal;
> journal.setJournalEventListener(this);
> }
> -
> +
> public void setPersistenceAdapter(PersistenceAdapter longTermPersistence) {
> this.longTermPersistence = longTermPersistence;
> }
> -
> +
> final Runnable createPeriodicCheckpointTask() {
> return new Runnable() {
> + @Override
> public void run() {
> long lastTime = 0;
> synchronized (this) {
> @@ -158,11 +161,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> * @param usageManager The UsageManager that is controlling the
> * destination's memory usage.
> */
> + @Override
> public void setUsageManager(SystemUsage usageManager) {
> this.usageManager = usageManager;
> longTermPersistence.setUsageManager(usageManager);
> }
>
> + @Override
> public Set<ActiveMQDestination> getDestinations() {
> Set<ActiveMQDestination> destinations = new HashSet<ActiveMQDestination>(longTermPersistence.getDestinations());
> destinations.addAll(queues.keySet());
> @@ -178,6 +183,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> }
> }
>
> + @Override
> public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
> JournalMessageStore store = queues.get(destination);
> if (store == null) {
> @@ -188,6 +194,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> return store;
> }
>
> + @Override
> public TopicMessageStore createTopicMessageStore(ActiveMQTopic destinationName) throws IOException {
> JournalTopicMessageStore store = topics.get(destinationName);
> if (store == null) {
> @@ -203,6 +210,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> *
> * @param destination Destination to forget
> */
> + @Override
> public void removeQueueMessageStore(ActiveMQQueue destination) {
> queues.remove(destination);
> }
> @@ -212,30 +220,37 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> *
> * @param destination Destination to forget
> */
> + @Override
> public void removeTopicMessageStore(ActiveMQTopic destination) {
> topics.remove(destination);
> }
>
> + @Override
> public TransactionStore createTransactionStore() throws IOException {
> return transactionStore;
> }
>
> + @Override
> public long getLastMessageBrokerSequenceId() throws IOException {
> return longTermPersistence.getLastMessageBrokerSequenceId();
> }
>
> + @Override
> public void beginTransaction(ConnectionContext context) throws IOException {
> longTermPersistence.beginTransaction(context);
> }
>
> + @Override
> public void commitTransaction(ConnectionContext context) throws IOException {
> longTermPersistence.commitTransaction(context);
> }
>
> + @Override
> public void rollbackTransaction(ConnectionContext context) throws IOException {
> longTermPersistence.rollbackTransaction(context);
> }
>
> + @Override
> public synchronized void start() throws Exception {
> if (!started.compareAndSet(false, true)) {
> return;
> @@ -246,12 +261,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> }
>
> checkpointTask = taskRunnerFactory.createTaskRunner(new Task() {
> + @Override
> public boolean iterate() {
> return doCheckpoint();
> }
> }, "ActiveMQ Journal Checkpoint Worker");
>
> checkpointExecutor = new ThreadPoolExecutor(maxCheckpointWorkers, maxCheckpointWorkers, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
> + @Override
> public Thread newThread(Runnable runable) {
> Thread t = new Thread(runable, "Journal checkpoint worker");
> t.setPriority(7);
> @@ -279,6 +296,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
>
> }
>
> + @Override
> public void stop() throws Exception {
>
> this.usageManager.getMemoryUsage().removeUsageListener(this);
> @@ -330,16 +348,17 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> /**
> * The Journal give us a call back so that we can move old data out of the
> * journal. Taking a checkpoint does this for us.
> - *
> + *
> * @see org.apache.activemq.journal.JournalEventListener#overflowNotification(org.apache.activemq.journal.RecordLocation)
> */
> + @Override
> public void overflowNotification(RecordLocation safeLocation) {
> checkpoint(false, true);
> }
>
> /**
> * When we checkpoint we move all the journalled data to long term storage.
> - *
> + *
> */
> public void checkpoint(boolean sync, boolean fullCheckpoint) {
> try {
> @@ -369,13 +388,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> }
> }
>
> + @Override
> public void checkpoint(boolean sync) {
> checkpoint(sync, sync);
> }
>
> /**
> * This does the actual checkpoint.
> - *
> + *
> * @return
> */
> public boolean doCheckpoint() {
> @@ -398,7 +418,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> // We do many partial checkpoints (fullCheckpoint==false) to move
> // topic messages
> // to long term store as soon as possible.
> - //
> + //
> // We want to avoid doing that for queue messages since removes the
> // come in the same
> // checkpoint cycle will nullify the previous message add.
> @@ -411,6 +431,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> try {
> final JournalMessageStore ms = iterator.next();
> FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
> + @Override
> public RecordLocation call() throws Exception {
> return ms.checkpoint();
> }
> @@ -428,6 +449,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> try {
> final JournalTopicMessageStore ms = iterator.next();
> FutureTask<RecordLocation> task = new FutureTask<RecordLocation>(new Callable<RecordLocation>() {
> + @Override
> public RecordLocation call() throws Exception {
> return ms.checkpoint();
> }
> @@ -505,7 +527,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> /**
> * Move all the messages that were in the journal into long term storage. We
> * just replay and do a checkpoint.
> - *
> + *
> * @throws IOException
> * @throws IOException
> * @throws InvalidRecordLocationException
> @@ -644,11 +666,11 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> public RecordLocation writeCommand(DataStructure command, boolean sync) throws IOException {
> if (started.get()) {
> try {
> - return journal.write(toPacket(wireFormat.marshal(command)), sync);
> + return journal.write(toPacket(wireFormat.marshal(command)), sync);
> } catch (IOException ioe) {
> - LOG.error("Cannot write to the journal", ioe);
> - brokerService.handleIOException(ioe);
> - throw ioe;
> + LOG.error("Cannot write to the journal", ioe);
> + brokerService.handleIOException(ioe);
> + throw ioe;
> }
> }
> throw new IOException("closed");
> @@ -660,6 +682,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> return writeCommand(trace, sync);
> }
>
> + @Override
> public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
> newPercentUsage = (newPercentUsage / 10) * 10;
> oldPercentUsage = (oldPercentUsage / 10) * 10;
> @@ -673,6 +696,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> return transactionStore;
> }
>
> + @Override
> public void deleteAllMessages() throws IOException {
> try {
> JournalTrace trace = new JournalTrace();
> @@ -735,6 +759,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> return new ByteSequence(sequence.getData(), sequence.getOffset(), sequence.getLength());
> }
>
> + @Override
> public void setBrokerName(String brokerName) {
> longTermPersistence.setBrokerName(brokerName);
> }
> @@ -744,18 +769,22 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> return "JournalPersistenceAdapter(" + longTermPersistence + ")";
> }
>
> + @Override
> public void setDirectory(File dir) {
> this.directory=dir;
> }
> -
> +
> + @Override
> public File getDirectory(){
> return directory;
> }
> -
> +
> + @Override
> public long size(){
> return 0;
> }
>
> + @Override
> public void setBrokerService(BrokerService brokerService) {
> this.brokerService = brokerService;
> PersistenceAdapter pa = getLongTermPersistence();
> @@ -764,8 +793,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
> }
> }
>
> + @Override
> public long getLastProducerSequenceId(ProducerId id) {
> return -1;
> }
>
> + @Override
> + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
> + return longTermPersistence.createJobSchedulerStore();
> + }
> +
> }
>
> http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> ----------------------------------------------------------------------
> diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> new file mode 100644
> index 0000000..edb2750
> --- /dev/null
> +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBMetaData.java
> @@ -0,0 +1,57 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.activemq.store.kahadb;
> +
> +import org.apache.activemq.store.kahadb.disk.journal.Location;
> +import org.apache.activemq.store.kahadb.disk.page.Page;
> +
> +public abstract class AbstractKahaDBMetaData<T> implements KahaDBMetaData<T> {
> +
> + private int state;
> + private Location lastUpdateLocation;
> + private Page<T> page;
> +
> + @Override
> + public Page<T> getPage() {
> + return page;
> + }
> +
> + @Override
> + public int getState() {
> + return state;
> + }
> +
> + @Override
> + public Location getLastUpdateLocation() {
> + return lastUpdateLocation;
> + }
> +
> + @Override
> + public void setPage(Page<T> page) {
> + this.page = page;
> + }
> +
> + @Override
> + public void setState(int value) {
> + this.state = value;
> + }
> +
> + @Override
> + public void setLastUpdateLocation(Location location) {
> + this.lastUpdateLocation = location;
> + }
> +}
>
> .
>
--
Tim Bish
Sr Software Engineer | RedHat Inc.
tim.bish@redhat.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/