You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/06 23:05:07 UTC

svn commit: r1406371 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/broker/jmx/ src/main/java/org/apache/activemq/broker/scheduler/ src/main/java/org/apache/activemq/store/ src/main/java/org/apache/activemq/store/kahadb/schedu...

Author: chirino
Date: Tue Nov  6 22:05:06 2012
New Revision: 1406371

URL: http://svn.apache.org/viewvc?rev=1406371&view=rev
Log:
Keep the broker.scheduler package free of kahadb impl specifics.

Added:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
      - copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
      - copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
      - copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
      - copied, changed from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
Removed:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Nov  6 22:05:06 2012
@@ -1136,11 +1136,11 @@
                                 <exclude>org/apache/activemq/xbean/JDBCPersistenceXBeanConfigTest.*</exclude>
                                 <exclude>org/apache/activemq/xbean/XBeanXmlTest.*</exclude>
                                 <exclude>org/apache/bugs/AMQ1730Test.*</exclude>
-                                <exclude>org/apache/bugs/LoadBalanceTest.*</exclude>
-                                <exclude>org/apache/kahadb/index/BTreeIndexTest.*</exclude>
-                                <exclude>org/apache/kahadb/index/HashIndexTest.*</exclude>
-                                <exclude>org/apache/kahadb/index/ListIndexTest.*</exclude>
-                                <exclude>org/apache/kahadb/util/DataByteArrayInputStreamTest.*</exclude>
+                                <exclude>org/apache/activemq/store/kahadb/bugs/LoadBalanceTest.*</exclude>
+                                <exclude>org/apache/activemq/store/kahadb/disk/index/BTreeIndexTest.*</exclude>
+                                <exclude>org/apache/activemq/store/kahadb/disk/index/HashIndexTest.*</exclude>
+                                <exclude>org/apache/activemq/store/kahadb/disk/index/ListIndexTest.*</exclude>
+                                <exclude>org/apache/activemq/store/kahadb/disk/util/DataByteArrayInputStreamTest.*</exclude>
                             </excludes>
                         </configuration>
                     </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java Tue Nov  6 22:05:06 2012
@@ -16,17 +16,14 @@
  */
 package org.apache.activemq.broker.jmx;
 
-import java.io.IOException;
-import java.util.List;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.TabularData;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.scheduler.Job;
-import org.apache.activemq.broker.scheduler.JobImpl;
 import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSupport;
+
+import javax.management.openmbean.*;
+import java.io.IOException;
+import java.util.List;
 
 public class JobSchedulerView implements JobSchedulerViewMBean {
 
@@ -53,8 +50,8 @@ public class JobSchedulerView implements
         CompositeType ct = factory.getCompositeType();
         TabularType tt = new TabularType("Scheduled Jobs", "Scheduled Jobs", ct, new String[] { "jobId" });
         TabularDataSupport rc = new TabularDataSupport(tt);
-        long start = JobImpl.getDataTime(startTime);
-        long finish = JobImpl.getDataTime(finishTime);
+        long start = JobSupport.getDataTime(startTime);
+        long finish = JobSupport.getDataTime(finishTime);
         List<Job> jobs = this.jobScheduler.getAllJobs(start, finish);
         for (Job job : jobs) {
             rc.put(new CompositeDataSupport(ct, factory.getFields(job)));
@@ -76,7 +73,7 @@ public class JobSchedulerView implements
 
     public String getNextScheduleTime() throws Exception {
         long time = this.jobScheduler.getNextScheduleTime();
-        return JobImpl.getDateTime(time);
+        return JobSupport.getDateTime(time);
     }
 
     public void removeAllJobs() throws Exception {
@@ -85,8 +82,8 @@ public class JobSchedulerView implements
     }
 
     public void removeAllJobs(String startTime, String finishTime) throws Exception {
-        long start = JobImpl.getDataTime(startTime);
-        long finish = JobImpl.getDataTime(finishTime);
+        long start = JobSupport.getDataTime(startTime);
+        long finish = JobSupport.getDataTime(finishTime);
         this.jobScheduler.removeAllJobs(start, finish);
 
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java Tue Nov  6 22:05:06 2012
@@ -1,392 +1,20 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.activemq.broker.scheduler;
 
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.util.LockFile;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.activemq.Service;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-public class JobSchedulerStore extends ServiceSupport {
-    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
-    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
-    public static final int CLOSED_STATE = 1;
-    public static final int OPEN_STATE = 2;
-
-    private File directory;
-    PageFile pageFile;
-    private Journal journal;
-    private LockFile lockFile;
-    private boolean failIfDatabaseIsLocked;
-    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
-    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
-    private boolean enableIndexWriteAsync = false;
-    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
-    MetaData metaData = new MetaData(this);
-    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
-    Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
-
-    protected class MetaData {
-        protected MetaData(JobSchedulerStore store) {
-            this.store = store;
-        }
-        private final JobSchedulerStore store;
-        Page<MetaData> page;
-        BTreeIndex<Integer, Integer> journalRC;
-        BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
-
-        void createIndexes(Transaction tx) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
-        }
-
-        void load(Transaction tx) throws IOException {
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.storedSchedulers.load(tx);
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.load(tx);
-        }
-
-        void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
-            for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
-                Entry<String, JobSchedulerImpl> entry = i.next();
-                entry.getValue().load(tx);
-                schedulers.put(entry.getKey(), entry.getValue());
-            }
-        }
-
-        public void read(DataInput is) throws IOException {
-            this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
-            this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-            this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-            this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
-            this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-            this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-        }
-
-        public void write(DataOutput os) throws IOException {
-            os.writeLong(this.storedSchedulers.getPageId());
-            os.writeLong(this.journalRC.getPageId());
-
-        }
-    }
-
-    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
-        private final JobSchedulerStore store;
-
-        MetaDataMarshaller(JobSchedulerStore store) {
-            this.store = store;
-        }
-        public MetaData readPayload(DataInput dataIn) throws IOException {
-            MetaData rc = new MetaData(this.store);
-            rc.read(dataIn);
-            return rc;
-        }
-
-        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
-            object.write(dataOut);
-        }
-    }
-
-    class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
-        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
-            List<JobLocation> result = new ArrayList<JobLocation>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                JobLocation jobLocation = new JobLocation();
-                jobLocation.readExternal(dataIn);
-                result.add(jobLocation);
-            }
-            return result;
-        }
-
-        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (JobLocation jobLocation : value) {
-                jobLocation.writeExternal(dataOut);
-            }
-        }
-    }
-
-    class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
-        private final JobSchedulerStore store;
-        JobSchedulerMarshaller(JobSchedulerStore store) {
-            this.store = store;
-        }
-        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
-            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
-            result.read(dataIn);
-            return result;
-        }
-
-        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
-            js.write(dataOut);
-        }
-    }
-
-    public File getDirectory() {
-        return directory;
-    }
-
-    public void setDirectory(File directory) {
-        this.directory = directory;
-    }
-    
-    public long size() {
-        if ( !isStarted() ) {
-            return 0;
-        }
-        try {
-            return journal.getDiskSize() + pageFile.getDiskSize();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public JobScheduler getJobScheduler(final String name) throws Exception {
-        JobSchedulerImpl result = this.schedulers.get(name);
-        if (result == null) {
-            final JobSchedulerImpl js = new JobSchedulerImpl(this);
-            js.setName(name);
-            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    js.createIndexes(tx);
-                    js.load(tx);
-                    metaData.storedSchedulers.put(tx, name, js);
-                }
-            });
-            result = js;
-            this.schedulers.put(name, js);
-            if (isStarted()) {
-                result.start();
-            }
-            this.pageFile.flush();
-        }
-        return result;
-    }
 
-    synchronized public boolean removeJobScheduler(final String name) throws Exception {
-        boolean result = false;
-        final JobSchedulerImpl js = this.schedulers.remove(name);
-        result = js != null;
-        if (result) {
-            js.stop();
-            getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    metaData.storedSchedulers.remove(tx, name);
-                    js.destroy(tx);
-                }
-            });
-        }
-        return result;
-    }
-
-    @Override
-    protected synchronized void doStart() throws Exception {
-        if (this.directory == null) {
-            this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
-        }
-        IOHelper.mkdirs(this.directory);
-        lock();
-        this.journal = new Journal();
-        this.journal.setDirectory(directory);
-        this.journal.setMaxFileLength(getJournalMaxFileLength());
-        this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
-        this.journal.start();
-        this.pageFile = new PageFile(directory, "scheduleDB");
-        this.pageFile.setWriteBatchSize(1);
-        this.pageFile.load();
-
-        this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                if (pageFile.getPageCount() == 0) {
-                    Page<MetaData> page = tx.allocate();
-                    assert page.getPageId() == 0;
-                    page.set(metaData);
-                    metaData.page = page;
-                    metaData.createIndexes(tx);
-                    tx.store(metaData.page, metaDataMarshaller, true);
-
-                } else {
-                    Page<MetaData> page = tx.load(0, metaDataMarshaller);
-                    metaData = page.get();
-                    metaData.page = page;
-                }
-                metaData.load(tx);
-                metaData.loadScheduler(tx, schedulers);
-                for (JobSchedulerImpl js :schedulers.values()) {
-                    try {
-                        js.start();
-                    } catch (Exception e) {
-                        JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
-                    }
-               }
-            }
-        });
-
-        this.pageFile.flush();
-        LOG.info(this + " started");
-    }
-    
-    @Override
-    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
-        for (JobSchedulerImpl js : this.schedulers.values()) {
-            js.stop();
-        }
-        if (this.pageFile != null) {
-            this.pageFile.unload();
-        }
-        if (this.journal != null) {
-            journal.close();
-        }
-        if (this.lockFile != null) {
-            this.lockFile.unlock();
-        }
-        this.lockFile = null;
-        LOG.info(this + " stopped");
-
-    }
-
-    synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        Integer val = this.metaData.journalRC.get(tx, logId);
-        int refCount = val != null ? val.intValue() + 1 : 1;
-        this.metaData.journalRC.put(tx, logId, refCount);
-
-    }
-
-    synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
-        int logId = location.getDataFileId();
-        int refCount = this.metaData.journalRC.get(tx, logId);
-        refCount--;
-        if (refCount <= 0) {
-            this.metaData.journalRC.remove(tx, logId);
-            Set<Integer> set = new HashSet<Integer>();
-            set.add(logId);
-            this.journal.removeDataFiles(set);
-        } else {
-            this.metaData.journalRC.put(tx, logId, refCount);
-        }
-
-    }
-
-    synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
-        ByteSequence result = null;
-        result = this.journal.read(location);
-        return result;
-    }
-
-    synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
-        return this.journal.write(payload, sync);
-    }
-
-    private void lock() throws IOException {
-        if (lockFile == null) {
-            File lockFileName = new File(directory, "lock");
-            lockFile = new LockFile(lockFileName, true);
-            if (failIfDatabaseIsLocked) {
-                lockFile.lock();
-            } else {
-                while (true) {
-                    try {
-                        lockFile.lock();
-                        break;
-                    } catch (IOException e) {
-                        LOG.info("Database " + lockFileName + " is locked... waiting "
-                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
-                                + " seconds for the database to be unlocked. Reason: " + e);
-                        try {
-                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
-                        } catch (InterruptedException e1) {
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    PageFile getPageFile() {
-        this.pageFile.isLoaded();
-        return this.pageFile;
-    }
-
-    public boolean isFailIfDatabaseIsLocked() {
-        return failIfDatabaseIsLocked;
-    }
-
-    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
-        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
-    }
-
-    public int getJournalMaxFileLength() {
-        return journalMaxFileLength;
-    }
-
-    public void setJournalMaxFileLength(int journalMaxFileLength) {
-        this.journalMaxFileLength = journalMaxFileLength;
-    }
-
-    public int getJournalMaxWriteBatchSize() {
-        return journalMaxWriteBatchSize;
-    }
-
-    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
-        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
-    }
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface JobSchedulerStore extends Service {
+    File getDirectory();
 
-    public boolean isEnableIndexWriteAsync() {
-        return enableIndexWriteAsync;
-    }
+    void setDirectory(File directory);
 
-    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
-        this.enableIndexWriteAsync = enableIndexWriteAsync;
-    }
+    long size();
 
-    @Override
-    public String toString() {
-        return "JobSchedulerStore:" + this.directory;
-    }
+    JobScheduler getJobScheduler(String name) throws Exception;
 
+    boolean removeJobScheduler(String name) throws Exception;
 }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java?rev=1406371&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSupport.java Tue Nov  6 22:05:06 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class JobSupport {
+    public static String getDateTime(long value) {
+        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        Date date = new Date(value);
+        return dateFormat.format(date);
+    }
+
+    public static long getDataTime(String value) throws Exception {
+         DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+         Date date = dfm.parse(value);
+         return date.getTime();
+     }
+
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java Tue Nov  6 22:05:06 2012
@@ -268,7 +268,8 @@ public class SchedulerBroker extends Bro
     private JobSchedulerStore getStore() throws Exception {
         if (started.get()) {
             if (this.store == null) {
-                this.store = new JobSchedulerStore();
+                String clazz = "org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl";
+                this.store = (JobSchedulerStore) getClass().getClassLoader().loadClass(clazz).newInstance();
                 this.store.setDirectory(directory);
                 this.store.start();
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/PListStore.java Tue Nov  6 22:05:06 2012
@@ -1,7 +1,6 @@
 package org.apache.activemq.store;
 
 import org.apache.activemq.Service;
-import org.apache.activemq.store.kahadb.plist.PListImpl;
 
 import java.io.File;
 
@@ -13,7 +12,7 @@ public interface PListStore extends Serv
 
     void setDirectory(File directory);
 
-    PListImpl getPList(String name) throws Exception;
+    PList getPList(String name) throws Exception;
 
     boolean removePList(String name) throws Exception;
 

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java Tue Nov  6 22:05:06 2012
@@ -14,11 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
 
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobSupport;
 import org.apache.activemq.util.ByteSequence;
 
 
@@ -63,27 +66,12 @@ public class JobImpl implements Job {
     
 
     public String getNextExecutionTime() {
-        return JobImpl.getDateTime(this.jobLocation.getNextTime());
+        return JobSupport.getDateTime(this.jobLocation.getNextTime());
     }
 
     public String getStartTime() {
-        return JobImpl.getDateTime(getStart());
-    }
-    
-   public static long getDataTime(String value) throws Exception {
-        DateFormat dfm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-     
-        Date date = dfm.parse(value);
-        return date.getTime();
-    }
-    
-    public static String getDateTime(long value) {
-        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        Date date = new Date(value);
-        return dateFormat.format(date);
+        return JobSupport.getDateTime(getStart());
     }
 
-    
-    
 
 }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobLocation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java Tue Nov  6 22:05:06 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
 
 import java.io.DataInput;
 import java.io.DataOutput;

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java Tue Nov  6 22:05:06 2012
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -28,6 +28,10 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.jms.MessageFormatException;
 
+import org.apache.activemq.broker.scheduler.CronParser;
+import org.apache.activemq.broker.scheduler.Job;
+import org.apache.activemq.broker.scheduler.JobListener;
+import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
@@ -42,7 +46,7 @@ import org.apache.activemq.store.kahadb.
 
 class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
-    final JobSchedulerStore store;
+    final JobSchedulerStoreImpl store;
     private final AtomicBoolean running = new AtomicBoolean();
     private String name;
     BTreeIndex<Long, List<JobLocation>> index;
@@ -51,7 +55,7 @@ class JobSchedulerImpl extends ServiceSu
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final ScheduleTime scheduleTime = new ScheduleTime();
 
-    JobSchedulerImpl(JobSchedulerStore store) {
+    JobSchedulerImpl(JobSchedulerStoreImpl store) {
 
         this.store = store;
     }

Copied: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java (from r1406370, activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java?p2=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java&p1=activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java&r1=1406370&r2=1406371&rev=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java Tue Nov  6 22:05:06 2012
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.broker.scheduler;
+package org.apache.activemq.store.kahadb.scheduler;
 
+import org.apache.activemq.broker.scheduler.JobScheduler;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
@@ -46,8 +48,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-public class JobSchedulerStore extends ServiceSupport {
-    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStore.class);
+public class JobSchedulerStoreImpl extends ServiceSupport implements JobSchedulerStore {
+    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
     public static final int CLOSED_STATE = 1;
@@ -67,10 +69,10 @@ public class JobSchedulerStore extends S
     Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
 
     protected class MetaData {
-        protected MetaData(JobSchedulerStore store) {
+        protected MetaData(JobSchedulerStoreImpl store) {
             this.store = store;
         }
-        private final JobSchedulerStore store;
+        private final JobSchedulerStoreImpl store;
         Page<MetaData> page;
         BTreeIndex<Integer, Integer> journalRC;
         BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
@@ -114,9 +116,9 @@ public class JobSchedulerStore extends S
     }
 
     class MetaDataMarshaller extends VariableMarshaller<MetaData> {
-        private final JobSchedulerStore store;
+        private final JobSchedulerStoreImpl store;
 
-        MetaDataMarshaller(JobSchedulerStore store) {
+        MetaDataMarshaller(JobSchedulerStoreImpl store) {
             this.store = store;
         }
         public MetaData readPayload(DataInput dataIn) throws IOException {
@@ -151,8 +153,8 @@ public class JobSchedulerStore extends S
     }
 
     class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
-        private final JobSchedulerStore store;
-        JobSchedulerMarshaller(JobSchedulerStore store) {
+        private final JobSchedulerStoreImpl store;
+        JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
             this.store = store;
         }
         public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
@@ -166,14 +168,17 @@ public class JobSchedulerStore extends S
         }
     }
 
+    @Override
     public File getDirectory() {
         return directory;
     }
 
+    @Override
     public void setDirectory(File directory) {
         this.directory = directory;
     }
     
+    @Override
     public long size() {
         if ( !isStarted() ) {
             return 0;
@@ -185,6 +190,7 @@ public class JobSchedulerStore extends S
         }
     }
 
+    @Override
     public JobScheduler getJobScheduler(final String name) throws Exception {
         JobSchedulerImpl result = this.schedulers.get(name);
         if (result == null) {
@@ -207,6 +213,7 @@ public class JobSchedulerStore extends S
         return result;
     }
 
+    @Override
     synchronized public boolean removeJobScheduler(final String name) throws Exception {
         boolean result = false;
         final JobSchedulerImpl js = this.schedulers.remove(name);
@@ -260,7 +267,7 @@ public class JobSchedulerStore extends S
                     try {
                         js.start();
                     } catch (Exception e) {
-                        JobSchedulerStore.LOG.error("Failed to load " + js.getName(),e);
+                        JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(),e);
                     }
                }
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerStoreTest.java Tue Nov  6 22:05:06 2012
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.scheduler;
 
 import junit.framework.TestCase;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ByteSequence;
 
@@ -27,7 +28,7 @@ import java.util.List;
 public class JobSchedulerStoreTest extends TestCase {
 
 	public void testRestart() throws Exception {
-		JobSchedulerStore store = new JobSchedulerStore();
+		JobSchedulerStore store = new JobSchedulerStoreImpl();
 		File directory = new File("target/test/ScheduledDB");
 		  IOHelper.mkdirs(directory);
 	      IOHelper.deleteChildren(directory);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java?rev=1406371&r1=1406370&r2=1406371&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTest.java Tue Nov  6 22:05:06 2012
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.ByteSequence;
 import org.junit.After;
@@ -240,7 +241,7 @@ public class JobSchedulerTest {
     }
 
     protected void startStore(File directory) throws Exception {
-        store = new JobSchedulerStore();
+        store = new JobSchedulerStoreImpl();
         store.setDirectory(directory);
         store.start();
         scheduler = store.getJobScheduler("test");