You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/25 05:02:25 UTC

[3/5] activemq git commit: Revert "https://issues.apache.org/jira/browse/AMQ-3758"

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
deleted file mode 100644
index e22e4df..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-
-/**
- * A VariableMarshaller instance that performs the read and write of a list of
- * JobLocation objects using the JobLocation's built in read and write methods.
- */
-class JobLocationsMarshaller extends VariableMarshaller<List<JobLocation>> {
-    static JobLocationsMarshaller INSTANCE = new JobLocationsMarshaller();
-
-    @Override
-    public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
-        List<JobLocation> result = new ArrayList<JobLocation>();
-        int size = dataIn.readInt();
-        for (int i = 0; i < size; i++) {
-            JobLocation jobLocation = new JobLocation();
-            jobLocation.readExternal(dataIn);
-            result.add(jobLocation);
-        }
-        return result;
-    }
-
-    @Override
-    public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
-        dataOut.writeInt(value.size());
-        for (JobLocation jobLocation : value) {
-            jobLocation.writeExternal(dataOut);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index bcb819c..455801a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -32,15 +32,11 @@ import org.apache.activemq.broker.scheduler.CronParser;
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.broker.scheduler.JobListener;
 import org.apache.activemq.broker.scheduler.JobScheduler;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
 import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
 import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.ServiceStopper;
@@ -48,13 +44,12 @@ import org.apache.activemq.util.ServiceSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
-
+class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
-    private final JobSchedulerStoreImpl store;
+    final JobSchedulerStoreImpl store;
     private final AtomicBoolean running = new AtomicBoolean();
     private String name;
-    private BTreeIndex<Long, List<JobLocation>> index;
+    BTreeIndex<Long, List<JobLocation>> index;
     private Thread thread;
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
@@ -69,163 +64,233 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         this.name = name;
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#getName()
+     */
     @Override
     public String getName() {
         return this.name;
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener)
+     */
     @Override
     public void addListener(JobListener l) {
         this.jobListeners.add(l);
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener)
+     */
     @Override
     public void removeListener(JobListener l) {
         this.jobListeners.remove(l);
     }
 
     @Override
-    public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
-        doSchedule(jobId, payload, "", 0, delay, 0);
+    public synchronized void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, "", 0, delay, 0);
+            }
+        });
     }
 
     @Override
-    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
-        doSchedule(jobId, payload, cronEntry, 0, 0, 0);
+    public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, cronEntry, 0, 0, 0);
+            }
+        });
     }
 
     @Override
-    public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException {
-        doSchedule(jobId, payload, cronEntry, delay, period, repeat);
+    public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period,
+        final int repeat) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                schedule(tx, jobId, payload, cronEntry, delay, period, repeat);
+            }
+        });
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
+     */
     @Override
-    public void remove(final long time) throws IOException {
-        doRemoveRange(time, time);
+    public synchronized void remove(final long time) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, time);
+            }
+        });
     }
 
-    @Override
-    public void remove(final String jobId) throws IOException {
-        doRemove(-1, jobId);
+    synchronized void removeFromIndex(final long time, final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                removeFromIndex(tx, time, jobId);
+            }
+        });
     }
 
-    @Override
-    public void removeAllJobs() throws IOException {
-        doRemoveRange(0, Long.MAX_VALUE);
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String)
+     */
+    public synchronized void remove(final long time, final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, time, jobId);
+            }
+        });
+    }
+
+    synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, time, jobIds);
+            }
+        });
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
+     */
     @Override
-    public void removeAllJobs(final long start, final long finish) throws IOException {
-        doRemoveRange(start, finish);
+    public synchronized void remove(final String jobId) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                remove(tx, jobId);
+            }
+        });
     }
 
     @Override
-    public long getNextScheduleTime() throws IOException {
-        this.store.readLockIndex();
-        try {
-            Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
-            return first != null ? first.getKey() : -1l;
-        } finally {
-            this.store.readUnlockIndex();
-        }
+    public synchronized long getNextScheduleTime() throws IOException {
+        Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+        return first != null ? first.getKey() : -1l;
     }
 
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
+     */
     @Override
-    public List<Job> getNextScheduleJobs() throws IOException {
+    public synchronized List<Job> getNextScheduleJobs() throws IOException {
         final List<Job> result = new ArrayList<Job>();
-        this.store.readLockIndex();
-        try {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx);
-                    if (first != null) {
-                        for (JobLocation jl : first.getValue()) {
-                            ByteSequence bs = getPayload(jl.getLocation());
-                            Job job = new JobImpl(jl, bs);
-                            result.add(job);
-                        }
+
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
+                if (first != null) {
+                    for (JobLocation jl : first.getValue()) {
+                        ByteSequence bs = getPayload(jl.getLocation());
+                        Job job = new JobImpl(jl, bs);
+                        result.add(job);
                     }
                 }
-            });
-        } finally {
-            this.store.readUnlockIndex();
-        }
-        return result;
-    }
-
-    private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
-        this.store.readLockIndex();
-        try {
-            if (!this.store.isStopped() && !this.store.isStopping()) {
-                Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
-                return first;
             }
-        } finally {
-            this.store.readUnlockIndex();
-        }
-        return null;
+        });
+        return result;
     }
 
     @Override
-    public List<Job> getAllJobs() throws IOException {
+    public synchronized List<Job> getAllJobs() throws IOException {
         final List<Job> result = new ArrayList<Job>();
-        this.store.readLockIndex();
-        try {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
-                    while (iter.hasNext()) {
-                        Map.Entry<Long, List<JobLocation>> next = iter.next();
-                        if (next != null) {
-                            for (JobLocation jl : next.getValue()) {
-                                ByteSequence bs = getPayload(jl.getLocation());
-                                Job job = new JobImpl(jl, bs);
-                                result.add(job);
-                            }
-                        } else {
-                            break;
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
+                while (iter.hasNext()) {
+                    Map.Entry<Long, List<JobLocation>> next = iter.next();
+                    if (next != null) {
+                        for (JobLocation jl : next.getValue()) {
+                            ByteSequence bs = getPayload(jl.getLocation());
+                            Job job = new JobImpl(jl, bs);
+                            result.add(job);
                         }
+                    } else {
+                        break;
                     }
                 }
-            });
-        } finally {
-            this.store.readUnlockIndex();
-        }
+            }
+        });
         return result;
     }
 
     @Override
-    public List<Job> getAllJobs(final long start, final long finish) throws IOException {
+    public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException {
         final List<Job> result = new ArrayList<Job>();
-        this.store.readLockIndex();
-        try {
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                @Override
-                public void execute(Transaction tx) throws IOException {
-                    Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start);
-                    while (iter.hasNext()) {
-                        Map.Entry<Long, List<JobLocation>> next = iter.next();
-                        if (next != null && next.getKey().longValue() <= finish) {
-                            for (JobLocation jl : next.getValue()) {
-                                ByteSequence bs = getPayload(jl.getLocation());
-                                Job job = new JobImpl(jl, bs);
-                                result.add(job);
-                            }
-                        } else {
-                            break;
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
+                while (iter.hasNext()) {
+                    Map.Entry<Long, List<JobLocation>> next = iter.next();
+                    if (next != null && next.getKey().longValue() <= finish) {
+                        for (JobLocation jl : next.getValue()) {
+                            ByteSequence bs = getPayload(jl.getLocation());
+                            Job job = new JobImpl(jl, bs);
+                            result.add(job);
                         }
+                    } else {
+                        break;
                     }
                 }
-            });
-        } finally {
-            this.store.readUnlockIndex();
-        }
+            }
+        });
         return result;
     }
 
-    private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
+    @Override
+    public synchronized void removeAllJobs() throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                destroy(tx);
+            }
+        });
+    }
+
+    @Override
+    public synchronized void removeAllJobs(final long start, final long finish) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                destroy(tx, start, finish);
+            }
+        });
+    }
+
+    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+        return this.store.getPayload(location);
+    }
+
+    void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException {
         long startTime = System.currentTimeMillis();
         // round startTime - so we can schedule more jobs
         // at the same time
@@ -243,86 +308,38 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             // start time not set by CRON - so it it to the current time
             time = startTime;
         }
-
         if (delay > 0) {
             time += delay;
         } else {
             time += period;
         }
 
-        KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
-        newJob.setScheduler(name);
-        newJob.setJobId(jobId);
-        newJob.setStartTime(startTime);
-        newJob.setCronEntry(cronEntry);
-        newJob.setDelay(delay);
-        newJob.setPeriod(period);
-        newJob.setRepeat(repeat);
-        newJob.setNextExecutionTime(time);
-        newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength()));
-
-        this.store.store(newJob);
-    }
-
-    private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException {
-        KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
-        update.setScheduler(name);
-        update.setJobId(jobId);
-        update.setExecutionTime(executionTime);
-        update.setNextExecutionTime(nextExecutionTime);
-        update.setRescheduledCount(rescheduledCount);
-        this.store.store(update);
-    }
-
-    private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException {
-        for (JobLocation job : jobs) {
-            doRemove(executionTime, job.getJobId());
-        }
-    }
-
-    private void doRemove(long executionTime, final String jobId) throws IOException {
-        KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand();
-        remove.setScheduler(name);
-        remove.setJobId(jobId);
-        remove.setNextExecutionTime(executionTime);
-        this.store.store(remove);
-    }
-
-    private void doRemoveRange(long start, long end) throws IOException {
-        KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand();
-        destroy.setScheduler(name);
-        destroy.setStartTime(start);
-        destroy.setEndTime(end);
-        this.store.store(destroy);
-    }
-
-    /**
-     * Adds a new Scheduled job to the index.  Must be called under index lock.
-     *
-     * This method must ensure that a duplicate add is not processed into the scheduler.  On index
-     * recover some adds may be replayed and we don't allow more than one instance of a JobId to
-     * exist at any given scheduled time, so filter these out to ensure idempotence.
-     *
-     * @param tx
-     *      Transaction in which the update is performed.
-     * @param command
-     *      The new scheduled job command to process.
-     * @param location
-     *      The location where the add command is stored in the journal.
-     *
-     * @throws IOException if an error occurs updating the index.
-     */
-    protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException {
+        Location location = this.store.write(payload, false);
         JobLocation jobLocation = new JobLocation(location);
-        jobLocation.setJobId(command.getJobId());
-        jobLocation.setStartTime(command.getStartTime());
-        jobLocation.setCronEntry(command.getCronEntry());
-        jobLocation.setDelay(command.getDelay());
-        jobLocation.setPeriod(command.getPeriod());
-        jobLocation.setRepeat(command.getRepeat());
-
-        long nextExecutionTime = command.getNextExecutionTime();
+        this.store.incrementJournalCount(tx, location);
+        jobLocation.setJobId(jobId);
+        jobLocation.setStartTime(startTime);
+        jobLocation.setCronEntry(cronEntry);
+        jobLocation.setDelay(delay);
+        jobLocation.setPeriod(period);
+        jobLocation.setRepeat(repeat);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Scheduling " + jobLocation);
+        }
+        storeJob(tx, jobLocation, time);
+        this.scheduleTime.newJob();
+    }
+
+    synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
+        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+            @Override
+            public void execute(Transaction tx) throws IOException {
+                storeJob(tx, jobLocation, nextExecutionTime);
+            }
+        });
+    }
 
+    void storeJob(final Transaction tx, final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
         List<JobLocation> values = null;
         jobLocation.setNextTime(nextExecutionTime);
         if (this.index.containsKey(tx, nextExecutionTime)) {
@@ -331,239 +348,106 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         if (values == null) {
             values = new ArrayList<JobLocation>();
         }
+        values.add(jobLocation);
+        this.index.put(tx, nextExecutionTime, values);
+    }
 
-        // There can never be more than one instance of the same JobId scheduled at any
-        // given time, when it happens its probably the result of index recovery and this
-        // method must be idempotent so check for it first.
-        if (!values.contains(jobLocation)) {
-            values.add(jobLocation);
-
-            // Reference the log file where the add command is stored to prevent GC.
-            this.store.incrementJournalCount(tx, location);
-            this.index.put(tx, nextExecutionTime, values);
-            this.scheduleTime.newJob();
-        } else {
-            this.index.put(tx, nextExecutionTime, values);
-            LOG.trace("Job {} already in scheduler at this time {}",
-                      jobLocation.getJobId(), jobLocation.getNextTime());
+    void remove(Transaction tx, long time, String jobId) throws IOException {
+        JobLocation result = removeFromIndex(tx, time, jobId);
+        if (result != null) {
+            this.store.decrementJournalCount(tx, result.getLocation());
         }
     }
 
-    /**
-     * Reschedules a Job after it has be fired.
-     *
-     * For jobs that are repeating this method updates the job in the index by adding it to the
-     * jobs list for the new execution time.  If the job is not a cron type job then this method
-     * will reduce the repeat counter if the job has a fixed number of repeats set.  The Job will
-     * be removed from the jobs list it just executed on.
-     *
-     * This method must also update the value of the last update location in the JobLocation
-     * instance so that the checkpoint worker doesn't drop the log file in which that command lives.
-     *
-     * This method must ensure that an reschedule command that references a job that doesn't exist
-     * does not cause an error since it's possible that on recover the original add might be gone
-     * and so the job should not reappear in the scheduler.
-     *
-     * @param tx
-     *      The TX under which the index is updated.
-     * @param command
-     *      The reschedule command to process.
-     * @param location
-     *      The location in the index where the reschedule command was stored.
-     *
-     * @throws IOException if an error occurs during the reschedule.
-     */
-    protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException {
+    JobLocation removeFromIndex(Transaction tx, long time, String jobId) throws IOException {
         JobLocation result = null;
-        final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime());
-        if (current != null) {
-            for (int i = 0; i < current.size(); i++) {
-                JobLocation jl = current.get(i);
-                if (jl.getJobId().equals(command.getJobId())) {
-                    current.remove(i);
-                    if (!current.isEmpty()) {
-                        this.index.put(tx, command.getExecutionTime(), current);
+        List<JobLocation> values = this.index.remove(tx, time);
+        if (values != null) {
+            for (int i = 0; i < values.size(); i++) {
+                JobLocation jl = values.get(i);
+                if (jl.getJobId().equals(jobId)) {
+                    values.remove(i);
+                    if (!values.isEmpty()) {
+                        this.index.put(tx, time, values);
                     }
                     result = jl;
                     break;
                 }
             }
-        } else {
-            LOG.debug("Process reschedule command for job {} non-existent executime time {}.",
-                      command.getJobId(), command.getExecutionTime());
         }
+        return result;
+    }
 
+    private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException {
+        List<JobLocation> result = removeFromIndex(tx, time, jobIds);
         if (result != null) {
-            Location previousUpdate = result.getLastUpdate();
-
-            List<JobLocation> target = null;
-            result.setNextTime(command.getNextExecutionTime());
-            result.setLastUpdate(location);
-            result.setRescheduledCount(command.getRescheduledCount());
-            if (!result.isCron() && result.getRepeat() > 0) {
-                result.setRepeat(result.getRepeat() - 1);
+            for (JobLocation jl : result) {
+                this.store.decrementJournalCount(tx, jl.getLocation());
             }
-            if (this.index.containsKey(tx, command.getNextExecutionTime())) {
-                target = this.index.remove(tx, command.getNextExecutionTime());
-            }
-            if (target == null) {
-                target = new ArrayList<JobLocation>();
-            }
-            target.add(result);
-
-            // Track the location of the last reschedule command and release the log file
-            // reference for the previous one if there was one.
-            this.store.incrementJournalCount(tx, location);
-            if (previousUpdate != null) {
-                this.store.decrementJournalCount(tx, previousUpdate);
-            }
-
-            this.index.put(tx, command.getNextExecutionTime(), target);
-            this.scheduleTime.newJob();
-        } else {
-            LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.",
-                      command.getJobId(), command.getExecutionTime());
         }
     }
 
-    /**
-     * Removes a scheduled job from the scheduler.
-     *
-     * The remove operation can be of two forms.  The first is that there is a job Id but no set time
-     * (-1) in which case the jobs index is searched until the target job Id is located.  The alternate
-     * form is that a job Id and execution time are both set in which case the given time is checked
-     * for a job matching that Id.  In either case once an execution time is identified the job is
-     * removed and the index updated.
-     *
-     * This method should ensure that if the matching job is not found that no error results as it
-     * is possible that on a recover the initial add command could be lost so the job may not be
-     * rescheduled.
-     *
-     * @param tx
-     *      The transaction under which the index is updated.
-     * @param command
-     *      The remove command to process.
-     * @param location
-     *      The location of the remove command in the Journal.
-     *
-     * @throws IOException if an error occurs while updating the scheduler index.
-     */
-    void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException {
-
-        // Case 1: JobId and no time value means find the job and remove it.
-        // Case 2: JobId and a time value means find exactly this scheduled job.
-
-        Long executionTime = command.getNextExecutionTime();
-
-        List<JobLocation> values = null;
-
-        if (executionTime == -1) {
-            for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
-                Map.Entry<Long, List<JobLocation>> entry = i.next();
-                List<JobLocation> candidates = entry.getValue();
-                if (candidates != null) {
-                    for (JobLocation jl : candidates) {
-                        if (jl.getJobId().equals(command.getJobId())) {
-                            LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId());
-                            executionTime = entry.getKey();
-                            values = this.index.remove(tx, executionTime);
-                            break;
-                        }
-                    }
-                }
-            }
-        } else {
-            values = this.index.remove(tx, executionTime);
-        }
-
-        JobLocation removed = null;
-
-        // Remove the job and update the index if there are any other jobs scheduled at this time.
+    private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException {
+        List<JobLocation> result = null;
+        List<JobLocation> values = this.index.remove(tx, time);
         if (values != null) {
-            for (JobLocation job : values) {
-                if (job.getJobId().equals(command.getJobId())) {
-                    removed = job;
-                    values.remove(removed);
-                    break;
+            result = new ArrayList<JobLocation>(values.size());
+
+            for (JobLocation job : Jobs) {
+                if (values.remove(job)) {
+                    result.add(job);
                 }
             }
 
             if (!values.isEmpty()) {
-                this.index.put(tx, executionTime, values);
+                this.index.put(tx, time, values);
             }
         }
+        return result;
+    }
 
-        if (removed != null) {
-            LOG.trace("{} removed from scheduler {}", removed, this);
-
-            // Remove the references for add and reschedule commands for this job
-            // so that those logs can be GC'd when free.
-            this.store.decrementJournalCount(tx, removed.getLocation());
-            if (removed.getLastUpdate() != null) {
-                this.store.decrementJournalCount(tx, removed.getLastUpdate());
+    void remove(Transaction tx, long time) throws IOException {
+        List<JobLocation> values = this.index.remove(tx, time);
+        if (values != null) {
+            for (JobLocation jl : values) {
+                this.store.decrementJournalCount(tx, jl.getLocation());
             }
-
-            // now that the job is removed from the index we can store the remove info and
-            // then dereference the log files that hold the initial add command and the most
-            // recent update command.
-            this.store.referenceRemovedLocation(tx, location, removed);
         }
     }
 
-    /**
-     * Removes all scheduled jobs within a given time range.
-     *
-     * The method can be used to clear the entire scheduler index by specifying a range that
-     * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by
-     * setting start and end time to the same value.
-     *
-     * @param tx
-     *      The transaction under which the index is updated.
-     * @param command
-     *      The remove command to process.
-     * @param location
-     *      The location of the remove command in the Journal.
-     *
-     * @throws IOException if an error occurs while updating the scheduler index.
-     */
-    protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException {
-        removeInRange(tx, command.getStartTime(), command.getEndTime(), location);
+    void remove(Transaction tx, String id) throws IOException {
+        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+            Map.Entry<Long, List<JobLocation>> entry = i.next();
+            List<JobLocation> values = entry.getValue();
+            if (values != null) {
+                for (JobLocation jl : values) {
+                    if (jl.getJobId().equals(id)) {
+                        remove(tx, entry.getKey(), id);
+                        return;
+                    }
+                }
+            }
+        }
     }
 
-    /**
-     * Removes all jobs from the schedulers index.  Must be called with the index locked.
-     *
-     * @param tx
-     *      The transaction under which the index entries for this scheduler are removed.
-     *
-     * @throws IOException if an error occurs removing the jobs from the scheduler index.
-     */
-    protected void removeAll(Transaction tx) throws IOException {
-        this.removeInRange(tx, 0, Long.MAX_VALUE, null);
+    synchronized void destroy(Transaction tx) throws IOException {
+        List<Long> keys = new ArrayList<Long>();
+        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+            Map.Entry<Long, List<JobLocation>> entry = i.next();
+            keys.add(entry.getKey());
+        }
+
+        for (Long l : keys) {
+            List<JobLocation> values = this.index.remove(tx, l);
+            if (values != null) {
+                for (JobLocation jl : values) {
+                    this.store.decrementJournalCount(tx, jl.getLocation());
+                }
+            }
+        }
     }
 
-    /**
-     * Removes all scheduled jobs within the target range.
-     *
-     * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE]
-     * or it can be used to remove all jobs at a given scheduled time by passing the same time value
-     * for both start and end.  If the optional location parameter is set then this method will update
-     * the store's remove location tracker with the location value and the Jobs that are being removed.
-     *
-     * This method must be called with the store index locked for writes.
-     *
-     * @param tx
-     *      The transaction under which the index is to be updated.
-     * @param start
-     *      The start time for the remove operation.
-     * @param finish
-     *      The end time for the remove operation.
-     * @param location (optional)
-     *      The location of the remove command that triggered this remove.
-     *
-     * @throws IOException if an error occurs during the remove operation.
-     */
-    protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
+    synchronized void destroy(Transaction tx, long start, long finish) throws IOException {
         List<Long> keys = new ArrayList<Long>();
         for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
             Map.Entry<Long, List<JobLocation>> entry = i.next();
@@ -574,97 +458,32 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             }
         }
 
-        for (Long executionTime : keys) {
-            List<JobLocation> values = this.index.remove(tx, executionTime);
-            if (location != null) {
-                for (JobLocation job : values) {
-                    LOG.trace("Removing {} scheduled at: {}", job, executionTime);
-
-                    // Remove the references for add and reschedule commands for this job
-                    // so that those logs can be GC'd when free.
-                    this.store.decrementJournalCount(tx, job.getLocation());
-                    if (job.getLastUpdate() != null) {
-                        this.store.decrementJournalCount(tx, job.getLastUpdate());
-                    }
-
-                    // now that the job is removed from the index we can store the remove info and
-                    // then dereference the log files that hold the initial add command and the most
-                    // recent update command.
-                    this.store.referenceRemovedLocation(tx, location, job);
+        for (Long l : keys) {
+            List<JobLocation> values = this.index.remove(tx, l);
+            if (values != null) {
+                for (JobLocation jl : values) {
+                    this.store.decrementJournalCount(tx, jl.getLocation());
                 }
             }
         }
     }
 
-    /**
-     * Removes a Job from the index using it's Id value and the time it is currently set to
-     * be executed.  This method will only remove the Job if it is found at the given execution
-     * time.
-     *
-     * This method must be called under index lock.
-     *
-     * @param tx
-     *        the transaction under which this method is being executed.
-     * @param jobId
-     *        the target Job Id to remove.
-     * @param executionTime
-     *        the scheduled time that for the Job Id that is being removed.
-     *
-     * @returns true if the Job was removed or false if not found at the given time.
-     *
-     * @throws IOException if an error occurs while removing the Job.
-     */
-    protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException {
-        boolean result = false;
-
-        List<JobLocation> jobs = this.index.remove(tx, executionTime);
-        Iterator<JobLocation> jobsIter = jobs.iterator();
-        while (jobsIter.hasNext()) {
-            JobLocation job = jobsIter.next();
-            if (job.getJobId().equals(jobId)) {
-                jobsIter.remove();
-                // Remove the references for add and reschedule commands for this job
-                // so that those logs can be GC'd when free.
-                this.store.decrementJournalCount(tx, job.getLocation());
-                if (job.getLastUpdate() != null) {
-                    this.store.decrementJournalCount(tx, job.getLastUpdate());
-                }
-                result = true;
-                break;
-            }
+    private synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
+        if (!this.store.isStopped() && !this.store.isStopping()) {
+            Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+            return first;
         }
-
-        // Return the list to the index modified or unmodified.
-        this.index.put(tx, executionTime, jobs);
-
-        return result;
+        return null;
     }
 
-    /**
-     * Walks the Scheduled Job Tree and collects the add location and last update location
-     * for all scheduled jobs.
-     *
-     * This method must be called with the index locked.
-     *
-     * @param tx
-     *        the transaction under which this operation was invoked.
-     *
-     * @return a list of all referenced Location values for this JobSchedulerImpl
-     *
-     * @throws IOException if an error occurs walking the scheduler tree.
-     */
-    protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
-        List<JobLocation> references = new ArrayList<JobLocation>();
-
-        for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
-            Map.Entry<Long, List<JobLocation>> entry = i.next();
-            List<JobLocation> scheduled = entry.getValue();
-            for (JobLocation job : scheduled) {
-                references.add(job);
-            }
+    void fireJob(JobLocation job) throws IllegalStateException, IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Firing " + job);
+        }
+        ByteSequence bs = this.store.getPayload(job.getLocation());
+        for (JobListener l : jobListeners) {
+            l.scheduledJob(job.getJobId(), bs);
         }
-
-        return references;
     }
 
     @Override
@@ -673,14 +492,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             mainLoop();
         } catch (Throwable e) {
             if (this.running.get() && isStarted()) {
-                LOG.error("{} Caught exception in mainloop", this, e);
+                LOG.error(this + " Caught exception in mainloop", e);
             }
         } finally {
             if (running.get()) {
                 try {
                     stop();
                 } catch (Exception e) {
-                    LOG.error("Failed to stop {}", this);
+                    LOG.error("Failed to stop " + this);
                 }
             }
         }
@@ -688,55 +507,56 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     @Override
     public String toString() {
-        return "JobScheduler: " + this.name;
+        return "JobScheduler:" + this.name;
     }
 
     protected void mainLoop() {
         while (this.running.get()) {
             this.scheduleTime.clearNewJob();
             try {
+                // peek the next job
                 long currentTime = System.currentTimeMillis();
 
-                // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as
-                // needed before firing the job event.
+                // Read the list of scheduled events and fire the jobs.  Once done with the batch
+                // remove all that were fired, and reschedule as needed.
                 Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
                 if (first != null) {
                     List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
-                    List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
+                    List<JobLocation> fired = new ArrayList<JobLocation>(list.size());
                     final long executionTime = first.getKey();
                     long nextExecutionTime = 0;
                     if (executionTime <= currentTime) {
                         for (final JobLocation job : list) {
-
-                            if (!running.get()) {
-                                break;
-                            }
-
                             int repeat = job.getRepeat();
                             nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
                             long waitTime = nextExecutionTime - currentTime;
                             this.scheduleTime.setWaitTime(waitTime);
-                            if (!job.isCron()) {
+                            if (job.isCron() == false) {
                                 fireJob(job);
                                 if (repeat != 0) {
-                                    // Reschedule for the next time, the scheduler will take care of
-                                    // updating the repeat counter on the update.
-                                    doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
+                                    repeat--;
+                                    job.setRepeat(repeat);
+                                    // remove this job from the index so it doesn't get destroyed
+                                    removeFromIndex(executionTime, job.getJobId());
+                                    // and re-store it
+                                    storeJob(job, nextExecutionTime);
                                 } else {
-                                    toRemove.add(job);
+                                    fired.add(job);
                                 }
                             } else {
+                                // cron job will have a repeat time.
                                 if (repeat == 0) {
-                                    // This is a non-repeating Cron entry so we can fire and forget it.
+                                    // we haven't got a separate scheduler to execute at
+                                    // this time - just a cron job - so fire it
                                     fireJob(job);
                                 }
 
                                 if (nextExecutionTime > currentTime) {
-                                    // Reschedule the cron job as a new event, if the cron entry signals
-                                    // a repeat then it will be stored separately and fired as a normal
-                                    // event with decrementing repeat.
-                                    doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
-
+                                    // we will run again ...
+                                    // remove this job from the index - so it doesn't get destroyed
+                                    removeFromIndex(executionTime, job.getJobId());
+                                    // and re-store it
+                                    storeJob(job, nextExecutionTime);
                                     if (repeat != 0) {
                                         // we have a separate schedule to run at this time
                                         // so the cron job is used to set of a separate schedule
@@ -749,14 +569,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
                                         this.scheduleTime.setWaitTime(waitTime);
                                     }
                                 } else {
-                                    toRemove.add(job);
+                                    fired.add(job);
                                 }
                             }
                         }
 
                         // now remove all jobs that have not been rescheduled from this execution
                         // time, if there are no more entries in that time it will be removed.
-                        doRemove(executionTime, toRemove);
+                        remove(executionTime, fired);
 
                         // If there is a job that should fire before the currently set wait time
                         // we need to reset wait time otherwise we'll miss it.
@@ -768,30 +588,25 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
                             }
                         }
                     } else {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms");
+                        }
                         this.scheduleTime.setWaitTime(executionTime - currentTime);
                     }
                 }
 
                 this.scheduleTime.pause();
             } catch (Exception ioe) {
-                LOG.error("{} Failed to schedule job", this.name, ioe);
+                LOG.error(this.name + " Failed to schedule job", ioe);
                 try {
                     this.store.stop();
                 } catch (Exception e) {
-                    LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e);
+                    LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e);
                 }
             }
         }
     }
 
-    void fireJob(JobLocation job) throws IllegalStateException, IOException {
-        LOG.debug("Firing: {}", job);
-        ByteSequence bs = this.store.getPayload(job.getLocation());
-        for (JobListener l : jobListeners) {
-            l.scheduledJob(job.getJobId(), bs);
-        }
-    }
-
     @Override
     public void startDispatching() throws Exception {
         if (!this.running.get()) {
@@ -812,7 +627,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
             Thread t = this.thread;
             this.thread = null;
             if (t != null) {
-                t.join(3000);
+                t.join(1000);
             }
         }
     }
@@ -828,10 +643,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         stopDispatching();
     }
 
-    private ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
-        return this.store.getPayload(location);
-    }
-
     long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {
         long result = currentTime;
         String cron = job.getCronEntry();
@@ -849,7 +660,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
 
     void load(Transaction tx) throws IOException {
         this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
-        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
+        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
         this.index.load(tx);
     }
 
@@ -857,7 +668,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         this.name = in.readUTF();
         this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
         this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
-        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
+        this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
     }
 
     public void write(DataOutput out) throws IOException {
@@ -865,6 +676,30 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
         out.writeLong(this.index.getPageId());
     }
 
+    static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+        static ValueMarshaller INSTANCE = new ValueMarshaller();
+
+        @Override
+        public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+            List<JobLocation> result = new ArrayList<JobLocation>();
+            int size = dataIn.readInt();
+            for (int i = 0; i < size; i++) {
+                JobLocation jobLocation = new JobLocation();
+                jobLocation.readExternal(dataIn);
+                result.add(jobLocation);
+            }
+            return result;
+        }
+
+        @Override
+        public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+            dataOut.writeInt(value.size());
+            for (JobLocation jobLocation : value) {
+                jobLocation.writeExternal(dataOut);
+            }
+        }
+    }
+
     static class ScheduleTime {
         private final int DEFAULT_WAIT = 500;
         private final int DEFAULT_NEW_JOB_WAIT = 100;

http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
deleted file mode 100644
index c92dc7b..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData;
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The KahaDB MetaData used to house the Index data for the KahaDB implementation
- * of a JobSchedulerStore.
- */
-public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> {
-
-    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class);
-
-    private final JobSchedulerStoreImpl store;
-
-    private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN;
-    private int version = JobSchedulerStoreImpl.CURRENT_VERSION;
-
-    private BTreeIndex<Integer, List<Integer>> removeLocationTracker;
-    private BTreeIndex<Integer, Integer> journalRC;
-    private BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
-
-    /**
-     * Creates a new instance of this meta data object with the assigned
-     * parent JobSchedulerStore instance.
-     *
-     * @param store
-     *        the store instance that owns this meta data.
-     */
-    public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) {
-        this.store = store;
-    }
-
-    /**
-     * @return the current value of the Scheduler store identification token.
-     */
-    public UUID getToken() {
-        return this.token;
-    }
-
-    /**
-     * @return the current value of the version tag for this meta data instance.
-     */
-    public int getVersion() {
-        return this.version;
-    }
-
-    /**
-     * Gets the index that contains the location tracking information for Jobs
-     * that have been removed from the index but whose add operation has yet
-     * to be removed from the Journal.
-     *
-     * The Journal log file where a remove command is written cannot be released
-     * until the log file with the original add command has also been released,
-     * otherwise on a log replay the scheduled job could reappear in the scheduler
-     * since its corresponding remove might no longer be present.
-     *
-     * @return the remove command location tracker index.
-     */
-    public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() {
-        return this.removeLocationTracker;
-    }
-
-    /**
-     * Gets the index used to track the number of reference to a Journal log file.
-     *
-     * A log file in the Journal can only be considered for removal after all the
-     * references to it have been released.
-     *
-     * @return the journal log file reference counter index.
-     */
-    public BTreeIndex<Integer, Integer> getJournalRC() {
-        return this.journalRC;
-    }
-
-    /**
-     * Gets the index of JobScheduler instances that have been created and stored
-     * in the JobSchedulerStore instance.
-     *
-     * @return the index of stored JobScheduler instances.
-     */
-    public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() {
-        return this.storedSchedulers;
-    }
-
-    @Override
-    public void initialize(Transaction tx) throws IOException {
-        this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId());
-        this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId());
-        this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId());
-    }
-
-    @Override
-    public void load(Transaction tx) throws IOException {
-        this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-        this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-        this.storedSchedulers.load(tx);
-        this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-        this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-        this.journalRC.load(tx);
-        this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-        this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
-        this.removeLocationTracker.load(tx);
-    }
-
-    /**
-     * Loads all the stored JobScheduler instances into the provided map.
-     *
-     * @param tx
-     *        the Transaction under which the load operation should be executed.
-     * @param schedulers
-     *        a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored.
-     *
-     * @throws IOException if an error occurs while performing the load operation.
-     */
-    public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
-        for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
-            Entry<String, JobSchedulerImpl> entry = i.next();
-            entry.getValue().load(tx);
-            schedulers.put(entry.getKey(), entry.getValue());
-        }
-    }
-
-    @Override
-    public void read(DataInput in) throws IOException {
-        try {
-            long msb = in.readLong();
-            long lsb = in.readLong();
-            this.token = new UUID(msb, lsb);
-        } catch (Exception e) {
-            throw new UnknownStoreVersionException(e);
-        }
-
-        if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) {
-            throw new UnknownStoreVersionException(token.toString());
-        }
-        this.version = in.readInt();
-        if (in.readBoolean()) {
-            setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in));
-        } else {
-            setLastUpdateLocation(null);
-        }
-        this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong());
-        this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
-        this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
-        this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong());
-        this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-        this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
-        this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong());
-        this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-        this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
-
-        LOG.info("Scheduler Store version {} loaded", this.version);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeLong(this.token.getMostSignificantBits());
-        out.writeLong(this.token.getLeastSignificantBits());
-        out.writeInt(this.version);
-        if (getLastUpdateLocation() != null) {
-            out.writeBoolean(true);
-            LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out);
-        } else {
-            out.writeBoolean(false);
-        }
-        out.writeLong(this.storedSchedulers.getPageId());
-        out.writeLong(this.journalRC.getPageId());
-        out.writeLong(this.removeLocationTracker.getPageId());
-    }
-
-    private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
-        private final JobSchedulerStoreImpl store;
-
-        JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
-            this.store = store;
-        }
-
-        @Override
-        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
-            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
-            result.read(dataIn);
-            return result;
-        }
-
-        @Override
-        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
-            js.write(dataOut);
-        }
-    }
-
-    private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> {
-
-        @Override
-        public List<Integer> readPayload(DataInput dataIn) throws IOException {
-            List<Integer> result = new ArrayList<Integer>();
-            int size = dataIn.readInt();
-            for (int i = 0; i < size; i++) {
-                result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn));
-            }
-            return result;
-        }
-
-        @Override
-        public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException {
-            dataOut.writeInt(value.size());
-            for (Integer integer : value) {
-                IntegerMarshaller.INSTANCE.writePayload(integer, dataOut);
-            }
-        }
-    }
-}