You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/25 05:02:25 UTC
[3/5] activemq git commit: Revert
"https://issues.apache.org/jira/browse/AMQ-3758"
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
deleted file mode 100644
index e22e4df..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocationsMarshaller.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-
-/**
- * A VariableMarshaller instance that performs the read and write of a list of
- * JobLocation objects using the JobLocation's built in read and write methods.
- */
-class JobLocationsMarshaller extends VariableMarshaller<List<JobLocation>> {
- static JobLocationsMarshaller INSTANCE = new JobLocationsMarshaller();
-
- @Override
- public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
- List<JobLocation> result = new ArrayList<JobLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- JobLocation jobLocation = new JobLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- @Override
- public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (JobLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index bcb819c..455801a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -32,15 +32,11 @@ import org.apache.activemq.broker.scheduler.CronParser;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobListener;
import org.apache.activemq.broker.scheduler.JobScheduler;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@@ -48,13 +44,12 @@ import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
-
+class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
- private final JobSchedulerStoreImpl store;
+ final JobSchedulerStoreImpl store;
private final AtomicBoolean running = new AtomicBoolean();
private String name;
- private BTreeIndex<Long, List<JobLocation>> index;
+ BTreeIndex<Long, List<JobLocation>> index;
private Thread thread;
private final AtomicBoolean started = new AtomicBoolean(false);
private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
@@ -69,163 +64,233 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.name = name;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#getName()
+ */
@Override
public String getName() {
return this.name;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#addListener(org.apache.activemq .beanstalk.JobListener)
+ */
@Override
public void addListener(JobListener l) {
this.jobListeners.add(l);
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#removeListener(org.apache. activemq.beanstalk.JobListener)
+ */
@Override
public void removeListener(JobListener l) {
this.jobListeners.remove(l);
}
@Override
- public void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
- doSchedule(jobId, payload, "", 0, delay, 0);
+ public synchronized void schedule(final String jobId, final ByteSequence payload, final long delay) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ schedule(tx, jobId, payload, "", 0, delay, 0);
+ }
+ });
}
@Override
- public void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
- doSchedule(jobId, payload, cronEntry, 0, 0, 0);
+ public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry) throws Exception {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ schedule(tx, jobId, payload, cronEntry, 0, 0, 0);
+ }
+ });
}
@Override
- public void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period, final int repeat) throws IOException {
- doSchedule(jobId, payload, cronEntry, delay, period, repeat);
+ public synchronized void schedule(final String jobId, final ByteSequence payload, final String cronEntry, final long delay, final long period,
+ final int repeat) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ schedule(tx, jobId, payload, cronEntry, delay, period, repeat);
+ }
+ });
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#remove(long)
+ */
@Override
- public void remove(final long time) throws IOException {
- doRemoveRange(time, time);
+ public synchronized void remove(final long time) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ remove(tx, time);
+ }
+ });
}
- @Override
- public void remove(final String jobId) throws IOException {
- doRemove(-1, jobId);
+ synchronized void removeFromIndex(final long time, final String jobId) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ removeFromIndex(tx, time, jobId);
+ }
+ });
}
- @Override
- public void removeAllJobs() throws IOException {
- doRemoveRange(0, Long.MAX_VALUE);
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#remove(long, java.lang.String)
+ */
+ public synchronized void remove(final long time, final String jobId) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ remove(tx, time, jobId);
+ }
+ });
+ }
+
+ synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ remove(tx, time, jobIds);
+ }
+ });
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#remove(java.lang.String)
+ */
@Override
- public void removeAllJobs(final long start, final long finish) throws IOException {
- doRemoveRange(start, finish);
+ public synchronized void remove(final String jobId) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ remove(tx, jobId);
+ }
+ });
}
@Override
- public long getNextScheduleTime() throws IOException {
- this.store.readLockIndex();
- try {
- Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
- return first != null ? first.getKey() : -1l;
- } finally {
- this.store.readUnlockIndex();
- }
+ public synchronized long getNextScheduleTime() throws IOException {
+ Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+ return first != null ? first.getKey() : -1l;
}
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.activemq.beanstalk.JobScheduler#getNextScheduleJobs()
+ */
@Override
- public List<Job> getNextScheduleJobs() throws IOException {
+ public synchronized List<Job> getNextScheduleJobs() throws IOException {
final List<Job> result = new ArrayList<Job>();
- this.store.readLockIndex();
- try {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Map.Entry<Long, List<JobLocation>> first = index.getFirst(tx);
- if (first != null) {
- for (JobLocation jl : first.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- Job job = new JobImpl(jl, bs);
- result.add(job);
- }
+
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ Map.Entry<Long, List<JobLocation>> first = index.getFirst(store.getPageFile().tx());
+ if (first != null) {
+ for (JobLocation jl : first.getValue()) {
+ ByteSequence bs = getPayload(jl.getLocation());
+ Job job = new JobImpl(jl, bs);
+ result.add(job);
}
}
- });
- } finally {
- this.store.readUnlockIndex();
- }
- return result;
- }
-
- private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
- this.store.readLockIndex();
- try {
- if (!this.store.isStopped() && !this.store.isStopping()) {
- Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
- return first;
}
- } finally {
- this.store.readUnlockIndex();
- }
- return null;
+ });
+ return result;
}
@Override
- public List<Job> getAllJobs() throws IOException {
+ public synchronized List<Job> getAllJobs() throws IOException {
final List<Job> result = new ArrayList<Job>();
- this.store.readLockIndex();
- try {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
- while (iter.hasNext()) {
- Map.Entry<Long, List<JobLocation>> next = iter.next();
- if (next != null) {
- for (JobLocation jl : next.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- Job job = new JobImpl(jl, bs);
- result.add(job);
- }
- } else {
- break;
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx());
+ while (iter.hasNext()) {
+ Map.Entry<Long, List<JobLocation>> next = iter.next();
+ if (next != null) {
+ for (JobLocation jl : next.getValue()) {
+ ByteSequence bs = getPayload(jl.getLocation());
+ Job job = new JobImpl(jl, bs);
+ result.add(job);
}
+ } else {
+ break;
}
}
- });
- } finally {
- this.store.readUnlockIndex();
- }
+ }
+ });
return result;
}
@Override
- public List<Job> getAllJobs(final long start, final long finish) throws IOException {
+ public synchronized List<Job> getAllJobs(final long start, final long finish) throws IOException {
final List<Job> result = new ArrayList<Job>();
- this.store.readLockIndex();
- try {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(tx, start);
- while (iter.hasNext()) {
- Map.Entry<Long, List<JobLocation>> next = iter.next();
- if (next != null && next.getKey().longValue() <= finish) {
- for (JobLocation jl : next.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- Job job = new JobImpl(jl, bs);
- result.add(job);
- }
- } else {
- break;
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ Iterator<Map.Entry<Long, List<JobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
+ while (iter.hasNext()) {
+ Map.Entry<Long, List<JobLocation>> next = iter.next();
+ if (next != null && next.getKey().longValue() <= finish) {
+ for (JobLocation jl : next.getValue()) {
+ ByteSequence bs = getPayload(jl.getLocation());
+ Job job = new JobImpl(jl, bs);
+ result.add(job);
}
+ } else {
+ break;
}
}
- });
- } finally {
- this.store.readUnlockIndex();
- }
+ }
+ });
return result;
}
- private void doSchedule(final String jobId, final ByteSequence payload, final String cronEntry, long delay, long period, int repeat) throws IOException {
+ @Override
+ public synchronized void removeAllJobs() throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ destroy(tx);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void removeAllJobs(final long start, final long finish) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ destroy(tx, start, finish);
+ }
+ });
+ }
+
+ ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+ return this.store.getPayload(location);
+ }
+
+ void schedule(Transaction tx, String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException {
long startTime = System.currentTimeMillis();
// round startTime - so we can schedule more jobs
// at the same time
@@ -243,86 +308,38 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
// start time not set by CRON - so it it to the current time
time = startTime;
}
-
if (delay > 0) {
time += delay;
} else {
time += period;
}
- KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
- newJob.setScheduler(name);
- newJob.setJobId(jobId);
- newJob.setStartTime(startTime);
- newJob.setCronEntry(cronEntry);
- newJob.setDelay(delay);
- newJob.setPeriod(period);
- newJob.setRepeat(repeat);
- newJob.setNextExecutionTime(time);
- newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength()));
-
- this.store.store(newJob);
- }
-
- private void doReschedule(final String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException {
- KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
- update.setScheduler(name);
- update.setJobId(jobId);
- update.setExecutionTime(executionTime);
- update.setNextExecutionTime(nextExecutionTime);
- update.setRescheduledCount(rescheduledCount);
- this.store.store(update);
- }
-
- private void doRemove(final long executionTime, final List<JobLocation> jobs) throws IOException {
- for (JobLocation job : jobs) {
- doRemove(executionTime, job.getJobId());
- }
- }
-
- private void doRemove(long executionTime, final String jobId) throws IOException {
- KahaRemoveScheduledJobCommand remove = new KahaRemoveScheduledJobCommand();
- remove.setScheduler(name);
- remove.setJobId(jobId);
- remove.setNextExecutionTime(executionTime);
- this.store.store(remove);
- }
-
- private void doRemoveRange(long start, long end) throws IOException {
- KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand();
- destroy.setScheduler(name);
- destroy.setStartTime(start);
- destroy.setEndTime(end);
- this.store.store(destroy);
- }
-
- /**
- * Adds a new Scheduled job to the index. Must be called under index lock.
- *
- * This method must ensure that a duplicate add is not processed into the scheduler. On index
- * recover some adds may be replayed and we don't allow more than one instance of a JobId to
- * exist at any given scheduled time, so filter these out to ensure idempotence.
- *
- * @param tx
- * Transaction in which the update is performed.
- * @param command
- * The new scheduled job command to process.
- * @param location
- * The location where the add command is stored in the journal.
- *
- * @throws IOException if an error occurs updating the index.
- */
- protected void process(final Transaction tx, final KahaAddScheduledJobCommand command, Location location) throws IOException {
+ Location location = this.store.write(payload, false);
JobLocation jobLocation = new JobLocation(location);
- jobLocation.setJobId(command.getJobId());
- jobLocation.setStartTime(command.getStartTime());
- jobLocation.setCronEntry(command.getCronEntry());
- jobLocation.setDelay(command.getDelay());
- jobLocation.setPeriod(command.getPeriod());
- jobLocation.setRepeat(command.getRepeat());
-
- long nextExecutionTime = command.getNextExecutionTime();
+ this.store.incrementJournalCount(tx, location);
+ jobLocation.setJobId(jobId);
+ jobLocation.setStartTime(startTime);
+ jobLocation.setCronEntry(cronEntry);
+ jobLocation.setDelay(delay);
+ jobLocation.setPeriod(period);
+ jobLocation.setRepeat(repeat);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling " + jobLocation);
+ }
+ storeJob(tx, jobLocation, time);
+ this.scheduleTime.newJob();
+ }
+
+ synchronized void storeJob(final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ storeJob(tx, jobLocation, nextExecutionTime);
+ }
+ });
+ }
+ void storeJob(final Transaction tx, final JobLocation jobLocation, final long nextExecutionTime) throws IOException {
List<JobLocation> values = null;
jobLocation.setNextTime(nextExecutionTime);
if (this.index.containsKey(tx, nextExecutionTime)) {
@@ -331,239 +348,106 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
if (values == null) {
values = new ArrayList<JobLocation>();
}
+ values.add(jobLocation);
+ this.index.put(tx, nextExecutionTime, values);
+ }
- // There can never be more than one instance of the same JobId scheduled at any
- // given time, when it happens its probably the result of index recovery and this
- // method must be idempotent so check for it first.
- if (!values.contains(jobLocation)) {
- values.add(jobLocation);
-
- // Reference the log file where the add command is stored to prevent GC.
- this.store.incrementJournalCount(tx, location);
- this.index.put(tx, nextExecutionTime, values);
- this.scheduleTime.newJob();
- } else {
- this.index.put(tx, nextExecutionTime, values);
- LOG.trace("Job {} already in scheduler at this time {}",
- jobLocation.getJobId(), jobLocation.getNextTime());
+ void remove(Transaction tx, long time, String jobId) throws IOException {
+ JobLocation result = removeFromIndex(tx, time, jobId);
+ if (result != null) {
+ this.store.decrementJournalCount(tx, result.getLocation());
}
}
- /**
- * Reschedules a Job after it has be fired.
- *
- * For jobs that are repeating this method updates the job in the index by adding it to the
- * jobs list for the new execution time. If the job is not a cron type job then this method
- * will reduce the repeat counter if the job has a fixed number of repeats set. The Job will
- * be removed from the jobs list it just executed on.
- *
- * This method must also update the value of the last update location in the JobLocation
- * instance so that the checkpoint worker doesn't drop the log file in which that command lives.
- *
- * This method must ensure that an reschedule command that references a job that doesn't exist
- * does not cause an error since it's possible that on recover the original add might be gone
- * and so the job should not reappear in the scheduler.
- *
- * @param tx
- * The TX under which the index is updated.
- * @param command
- * The reschedule command to process.
- * @param location
- * The location in the index where the reschedule command was stored.
- *
- * @throws IOException if an error occurs during the reschedule.
- */
- protected void process(final Transaction tx, final KahaRescheduleJobCommand command, Location location) throws IOException {
+ JobLocation removeFromIndex(Transaction tx, long time, String jobId) throws IOException {
JobLocation result = null;
- final List<JobLocation> current = this.index.remove(tx, command.getExecutionTime());
- if (current != null) {
- for (int i = 0; i < current.size(); i++) {
- JobLocation jl = current.get(i);
- if (jl.getJobId().equals(command.getJobId())) {
- current.remove(i);
- if (!current.isEmpty()) {
- this.index.put(tx, command.getExecutionTime(), current);
+ List<JobLocation> values = this.index.remove(tx, time);
+ if (values != null) {
+ for (int i = 0; i < values.size(); i++) {
+ JobLocation jl = values.get(i);
+ if (jl.getJobId().equals(jobId)) {
+ values.remove(i);
+ if (!values.isEmpty()) {
+ this.index.put(tx, time, values);
}
result = jl;
break;
}
}
- } else {
- LOG.debug("Process reschedule command for job {} non-existent executime time {}.",
- command.getJobId(), command.getExecutionTime());
}
+ return result;
+ }
+ private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException {
+ List<JobLocation> result = removeFromIndex(tx, time, jobIds);
if (result != null) {
- Location previousUpdate = result.getLastUpdate();
-
- List<JobLocation> target = null;
- result.setNextTime(command.getNextExecutionTime());
- result.setLastUpdate(location);
- result.setRescheduledCount(command.getRescheduledCount());
- if (!result.isCron() && result.getRepeat() > 0) {
- result.setRepeat(result.getRepeat() - 1);
+ for (JobLocation jl : result) {
+ this.store.decrementJournalCount(tx, jl.getLocation());
}
- if (this.index.containsKey(tx, command.getNextExecutionTime())) {
- target = this.index.remove(tx, command.getNextExecutionTime());
- }
- if (target == null) {
- target = new ArrayList<JobLocation>();
- }
- target.add(result);
-
- // Track the location of the last reschedule command and release the log file
- // reference for the previous one if there was one.
- this.store.incrementJournalCount(tx, location);
- if (previousUpdate != null) {
- this.store.decrementJournalCount(tx, previousUpdate);
- }
-
- this.index.put(tx, command.getNextExecutionTime(), target);
- this.scheduleTime.newJob();
- } else {
- LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.",
- command.getJobId(), command.getExecutionTime());
}
}
- /**
- * Removes a scheduled job from the scheduler.
- *
- * The remove operation can be of two forms. The first is that there is a job Id but no set time
- * (-1) in which case the jobs index is searched until the target job Id is located. The alternate
- * form is that a job Id and execution time are both set in which case the given time is checked
- * for a job matching that Id. In either case once an execution time is identified the job is
- * removed and the index updated.
- *
- * This method should ensure that if the matching job is not found that no error results as it
- * is possible that on a recover the initial add command could be lost so the job may not be
- * rescheduled.
- *
- * @param tx
- * The transaction under which the index is updated.
- * @param command
- * The remove command to process.
- * @param location
- * The location of the remove command in the Journal.
- *
- * @throws IOException if an error occurs while updating the scheduler index.
- */
- void process(final Transaction tx, final KahaRemoveScheduledJobCommand command, Location location) throws IOException {
-
- // Case 1: JobId and no time value means find the job and remove it.
- // Case 2: JobId and a time value means find exactly this scheduled job.
-
- Long executionTime = command.getNextExecutionTime();
-
- List<JobLocation> values = null;
-
- if (executionTime == -1) {
- for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
- Map.Entry<Long, List<JobLocation>> entry = i.next();
- List<JobLocation> candidates = entry.getValue();
- if (candidates != null) {
- for (JobLocation jl : candidates) {
- if (jl.getJobId().equals(command.getJobId())) {
- LOG.trace("Entry {} contains the remove target: {}", entry.getKey(), command.getJobId());
- executionTime = entry.getKey();
- values = this.index.remove(tx, executionTime);
- break;
- }
- }
- }
- }
- } else {
- values = this.index.remove(tx, executionTime);
- }
-
- JobLocation removed = null;
-
- // Remove the job and update the index if there are any other jobs scheduled at this time.
+ private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException {
+ List<JobLocation> result = null;
+ List<JobLocation> values = this.index.remove(tx, time);
if (values != null) {
- for (JobLocation job : values) {
- if (job.getJobId().equals(command.getJobId())) {
- removed = job;
- values.remove(removed);
- break;
+ result = new ArrayList<JobLocation>(values.size());
+
+ for (JobLocation job : Jobs) {
+ if (values.remove(job)) {
+ result.add(job);
}
}
if (!values.isEmpty()) {
- this.index.put(tx, executionTime, values);
+ this.index.put(tx, time, values);
}
}
+ return result;
+ }
- if (removed != null) {
- LOG.trace("{} removed from scheduler {}", removed, this);
-
- // Remove the references for add and reschedule commands for this job
- // so that those logs can be GC'd when free.
- this.store.decrementJournalCount(tx, removed.getLocation());
- if (removed.getLastUpdate() != null) {
- this.store.decrementJournalCount(tx, removed.getLastUpdate());
+ void remove(Transaction tx, long time) throws IOException {
+ List<JobLocation> values = this.index.remove(tx, time);
+ if (values != null) {
+ for (JobLocation jl : values) {
+ this.store.decrementJournalCount(tx, jl.getLocation());
}
-
- // now that the job is removed from the index we can store the remove info and
- // then dereference the log files that hold the initial add command and the most
- // recent update command.
- this.store.referenceRemovedLocation(tx, location, removed);
}
}
- /**
- * Removes all scheduled jobs within a given time range.
- *
- * The method can be used to clear the entire scheduler index by specifying a range that
- * encompasses all time [0...Long.MAX_VALUE] or a single execution time can be removed by
- * setting start and end time to the same value.
- *
- * @param tx
- * The transaction under which the index is updated.
- * @param command
- * The remove command to process.
- * @param location
- * The location of the remove command in the Journal.
- *
- * @throws IOException if an error occurs while updating the scheduler index.
- */
- protected void process(final Transaction tx, final KahaRemoveScheduledJobsCommand command, Location location) throws IOException {
- removeInRange(tx, command.getStartTime(), command.getEndTime(), location);
+ void remove(Transaction tx, String id) throws IOException {
+ for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+ Map.Entry<Long, List<JobLocation>> entry = i.next();
+ List<JobLocation> values = entry.getValue();
+ if (values != null) {
+ for (JobLocation jl : values) {
+ if (jl.getJobId().equals(id)) {
+ remove(tx, entry.getKey(), id);
+ return;
+ }
+ }
+ }
+ }
}
- /**
- * Removes all jobs from the schedulers index. Must be called with the index locked.
- *
- * @param tx
- * The transaction under which the index entries for this scheduler are removed.
- *
- * @throws IOException if an error occurs removing the jobs from the scheduler index.
- */
- protected void removeAll(Transaction tx) throws IOException {
- this.removeInRange(tx, 0, Long.MAX_VALUE, null);
+ synchronized void destroy(Transaction tx) throws IOException {
+ List<Long> keys = new ArrayList<Long>();
+ for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
+ Map.Entry<Long, List<JobLocation>> entry = i.next();
+ keys.add(entry.getKey());
+ }
+
+ for (Long l : keys) {
+ List<JobLocation> values = this.index.remove(tx, l);
+ if (values != null) {
+ for (JobLocation jl : values) {
+ this.store.decrementJournalCount(tx, jl.getLocation());
+ }
+ }
+ }
}
- /**
- * Removes all scheduled jobs within the target range.
- *
- * This method can be used to remove all the stored jobs by passing a range of [0...Long.MAX_VALUE]
- * or it can be used to remove all jobs at a given scheduled time by passing the same time value
- * for both start and end. If the optional location parameter is set then this method will update
- * the store's remove location tracker with the location value and the Jobs that are being removed.
- *
- * This method must be called with the store index locked for writes.
- *
- * @param tx
- * The transaction under which the index is to be updated.
- * @param start
- * The start time for the remove operation.
- * @param finish
- * The end time for the remove operation.
- * @param location (optional)
- * The location of the remove command that triggered this remove.
- *
- * @throws IOException if an error occurs during the remove operation.
- */
- protected void removeInRange(Transaction tx, long start, long finish, Location location) throws IOException {
+ synchronized void destroy(Transaction tx, long start, long finish) throws IOException {
List<Long> keys = new ArrayList<Long>();
for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start); i.hasNext();) {
Map.Entry<Long, List<JobLocation>> entry = i.next();
@@ -574,97 +458,32 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
}
}
- for (Long executionTime : keys) {
- List<JobLocation> values = this.index.remove(tx, executionTime);
- if (location != null) {
- for (JobLocation job : values) {
- LOG.trace("Removing {} scheduled at: {}", job, executionTime);
-
- // Remove the references for add and reschedule commands for this job
- // so that those logs can be GC'd when free.
- this.store.decrementJournalCount(tx, job.getLocation());
- if (job.getLastUpdate() != null) {
- this.store.decrementJournalCount(tx, job.getLastUpdate());
- }
-
- // now that the job is removed from the index we can store the remove info and
- // then dereference the log files that hold the initial add command and the most
- // recent update command.
- this.store.referenceRemovedLocation(tx, location, job);
+ for (Long l : keys) {
+ List<JobLocation> values = this.index.remove(tx, l);
+ if (values != null) {
+ for (JobLocation jl : values) {
+ this.store.decrementJournalCount(tx, jl.getLocation());
}
}
}
}
- /**
- * Removes a Job from the index using it's Id value and the time it is currently set to
- * be executed. This method will only remove the Job if it is found at the given execution
- * time.
- *
- * This method must be called under index lock.
- *
- * @param tx
- * the transaction under which this method is being executed.
- * @param jobId
- * the target Job Id to remove.
- * @param executionTime
- * the scheduled time that for the Job Id that is being removed.
- *
- * @returns true if the Job was removed or false if not found at the given time.
- *
- * @throws IOException if an error occurs while removing the Job.
- */
- protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException {
- boolean result = false;
-
- List<JobLocation> jobs = this.index.remove(tx, executionTime);
- Iterator<JobLocation> jobsIter = jobs.iterator();
- while (jobsIter.hasNext()) {
- JobLocation job = jobsIter.next();
- if (job.getJobId().equals(jobId)) {
- jobsIter.remove();
- // Remove the references for add and reschedule commands for this job
- // so that those logs can be GC'd when free.
- this.store.decrementJournalCount(tx, job.getLocation());
- if (job.getLastUpdate() != null) {
- this.store.decrementJournalCount(tx, job.getLastUpdate());
- }
- result = true;
- break;
- }
+ private synchronized Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
+ if (!this.store.isStopped() && !this.store.isStopping()) {
+ Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
+ return first;
}
-
- // Return the list to the index modified or unmodified.
- this.index.put(tx, executionTime, jobs);
-
- return result;
+ return null;
}
- /**
- * Walks the Scheduled Job Tree and collects the add location and last update location
- * for all scheduled jobs.
- *
- * This method must be called with the index locked.
- *
- * @param tx
- * the transaction under which this operation was invoked.
- *
- * @return a list of all referenced Location values for this JobSchedulerImpl
- *
- * @throws IOException if an error occurs walking the scheduler tree.
- */
- protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
- List<JobLocation> references = new ArrayList<JobLocation>();
-
- for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
- Map.Entry<Long, List<JobLocation>> entry = i.next();
- List<JobLocation> scheduled = entry.getValue();
- for (JobLocation job : scheduled) {
- references.add(job);
- }
+ void fireJob(JobLocation job) throws IllegalStateException, IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Firing " + job);
+ }
+ ByteSequence bs = this.store.getPayload(job.getLocation());
+ for (JobListener l : jobListeners) {
+ l.scheduledJob(job.getJobId(), bs);
}
-
- return references;
}
@Override
@@ -673,14 +492,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
mainLoop();
} catch (Throwable e) {
if (this.running.get() && isStarted()) {
- LOG.error("{} Caught exception in mainloop", this, e);
+ LOG.error(this + " Caught exception in mainloop", e);
}
} finally {
if (running.get()) {
try {
stop();
} catch (Exception e) {
- LOG.error("Failed to stop {}", this);
+ LOG.error("Failed to stop " + this);
}
}
}
@@ -688,55 +507,56 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
@Override
public String toString() {
- return "JobScheduler: " + this.name;
+ return "JobScheduler:" + this.name;
}
protected void mainLoop() {
while (this.running.get()) {
this.scheduleTime.clearNewJob();
try {
+ // peek the next job
long currentTime = System.currentTimeMillis();
- // Read the list of scheduled events and fire the jobs, reschedule repeating jobs as
- // needed before firing the job event.
+ // Read the list of scheduled events and fire the jobs. Once done with the batch
+ // remove all that were fired, and reschedule as needed.
Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
if (first != null) {
List<JobLocation> list = new ArrayList<JobLocation>(first.getValue());
- List<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
+ List<JobLocation> fired = new ArrayList<JobLocation>(list.size());
final long executionTime = first.getKey();
long nextExecutionTime = 0;
if (executionTime <= currentTime) {
for (final JobLocation job : list) {
-
- if (!running.get()) {
- break;
- }
-
int repeat = job.getRepeat();
nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat);
long waitTime = nextExecutionTime - currentTime;
this.scheduleTime.setWaitTime(waitTime);
- if (!job.isCron()) {
+ if (job.isCron() == false) {
fireJob(job);
if (repeat != 0) {
- // Reschedule for the next time, the scheduler will take care of
- // updating the repeat counter on the update.
- doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
+ repeat--;
+ job.setRepeat(repeat);
+ // remove this job from the index so it doesn't get destroyed
+ removeFromIndex(executionTime, job.getJobId());
+ // and re-store it
+ storeJob(job, nextExecutionTime);
} else {
- toRemove.add(job);
+ fired.add(job);
}
} else {
+ // cron job will have a repeat time.
if (repeat == 0) {
- // This is a non-repeating Cron entry so we can fire and forget it.
+ // we haven't got a separate scheduler to execute at
+ // this time - just a cron job - so fire it
fireJob(job);
}
if (nextExecutionTime > currentTime) {
- // Reschedule the cron job as a new event, if the cron entry signals
- // a repeat then it will be stored separately and fired as a normal
- // event with decrementing repeat.
- doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
-
+ // we will run again ...
+ // remove this job from the index - so it doesn't get destroyed
+ removeFromIndex(executionTime, job.getJobId());
+ // and re-store it
+ storeJob(job, nextExecutionTime);
if (repeat != 0) {
// we have a separate schedule to run at this time
// so the cron job is used to set of a separate schedule
@@ -749,14 +569,14 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.scheduleTime.setWaitTime(waitTime);
}
} else {
- toRemove.add(job);
+ fired.add(job);
}
}
}
// now remove all jobs that have not been rescheduled from this execution
// time, if there are no more entries in that time it will be removed.
- doRemove(executionTime, toRemove);
+ remove(executionTime, fired);
// If there is a job that should fire before the currently set wait time
// we need to reset wait time otherwise we'll miss it.
@@ -768,30 +588,25 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
}
}
} else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms");
+ }
this.scheduleTime.setWaitTime(executionTime - currentTime);
}
}
this.scheduleTime.pause();
} catch (Exception ioe) {
- LOG.error("{} Failed to schedule job", this.name, ioe);
+ LOG.error(this.name + " Failed to schedule job", ioe);
try {
this.store.stop();
} catch (Exception e) {
- LOG.error("{} Failed to shutdown JobSchedulerStore", this.name, e);
+ LOG.error(this.name + " Failed to shutdown JobSchedulerStore", e);
}
}
}
}
- void fireJob(JobLocation job) throws IllegalStateException, IOException {
- LOG.debug("Firing: {}", job);
- ByteSequence bs = this.store.getPayload(job.getLocation());
- for (JobListener l : jobListeners) {
- l.scheduledJob(job.getJobId(), bs);
- }
- }
-
@Override
public void startDispatching() throws Exception {
if (!this.running.get()) {
@@ -812,7 +627,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
Thread t = this.thread;
this.thread = null;
if (t != null) {
- t.join(3000);
+ t.join(1000);
}
}
}
@@ -828,10 +643,6 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
stopDispatching();
}
- private ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
- return this.store.getPayload(location);
- }
-
long calculateNextExecutionTime(final JobLocation job, long currentTime, int repeat) throws MessageFormatException {
long result = currentTime;
String cron = job.getCronEntry();
@@ -849,7 +660,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
void load(Transaction tx) throws IOException {
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
- this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
+ this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
this.index.load(tx);
}
@@ -857,7 +668,7 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
this.name = in.readUTF();
this.index = new BTreeIndex<Long, List<JobLocation>>(this.store.getPageFile(), in.readLong());
this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
- this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
+ this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
}
public void write(DataOutput out) throws IOException {
@@ -865,6 +676,30 @@ public class JobSchedulerImpl extends ServiceSupport implements Runnable, JobSch
out.writeLong(this.index.getPageId());
}
+ static class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+ static ValueMarshaller INSTANCE = new ValueMarshaller();
+
+ @Override
+ public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+ List<JobLocation> result = new ArrayList<JobLocation>();
+ int size = dataIn.readInt();
+ for (int i = 0; i < size; i++) {
+ JobLocation jobLocation = new JobLocation();
+ jobLocation.readExternal(dataIn);
+ result.add(jobLocation);
+ }
+ return result;
+ }
+
+ @Override
+ public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(value.size());
+ for (JobLocation jobLocation : value) {
+ jobLocation.writeExternal(dataOut);
+ }
+ }
+ }
+
static class ScheduleTime {
private final int DEFAULT_WAIT = 500;
private final int DEFAULT_NEW_JOB_WAIT = 100;
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
deleted file mode 100644
index c92dc7b..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerKahaDBMetaData.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData;
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The KahaDB MetaData used to house the Index data for the KahaDB implementation
- * of a JobSchedulerStore.
- */
-public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> {
-
- static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class);
-
- private final JobSchedulerStoreImpl store;
-
- private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN;
- private int version = JobSchedulerStoreImpl.CURRENT_VERSION;
-
- private BTreeIndex<Integer, List<Integer>> removeLocationTracker;
- private BTreeIndex<Integer, Integer> journalRC;
- private BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
-
- /**
- * Creates a new instance of this meta data object with the assigned
- * parent JobSchedulerStore instance.
- *
- * @param store
- * the store instance that owns this meta data.
- */
- public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- /**
- * @return the current value of the Scheduler store identification token.
- */
- public UUID getToken() {
- return this.token;
- }
-
- /**
- * @return the current value of the version tag for this meta data instance.
- */
- public int getVersion() {
- return this.version;
- }
-
- /**
- * Gets the index that contains the location tracking information for Jobs
- * that have been removed from the index but whose add operation has yet
- * to be removed from the Journal.
- *
- * The Journal log file where a remove command is written cannot be released
- * until the log file with the original add command has also been released,
- * otherwise on a log replay the scheduled job could reappear in the scheduler
- * since its corresponding remove might no longer be present.
- *
- * @return the remove command location tracker index.
- */
- public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() {
- return this.removeLocationTracker;
- }
-
- /**
- * Gets the index used to track the number of reference to a Journal log file.
- *
- * A log file in the Journal can only be considered for removal after all the
- * references to it have been released.
- *
- * @return the journal log file reference counter index.
- */
- public BTreeIndex<Integer, Integer> getJournalRC() {
- return this.journalRC;
- }
-
- /**
- * Gets the index of JobScheduler instances that have been created and stored
- * in the JobSchedulerStore instance.
- *
- * @return the index of stored JobScheduler instances.
- */
- public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() {
- return this.storedSchedulers;
- }
-
- @Override
- public void initialize(Transaction tx) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId());
- this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId());
- this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId());
- }
-
- @Override
- public void load(Transaction tx) throws IOException {
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.storedSchedulers.load(tx);
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.load(tx);
- this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
- this.removeLocationTracker.load(tx);
- }
-
- /**
- * Loads all the stored JobScheduler instances into the provided map.
- *
- * @param tx
- * the Transaction under which the load operation should be executed.
- * @param schedulers
- * a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored.
- *
- * @throws IOException if an error occurs while performing the load operation.
- */
- public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
- for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
- Entry<String, JobSchedulerImpl> entry = i.next();
- entry.getValue().load(tx);
- schedulers.put(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public void read(DataInput in) throws IOException {
- try {
- long msb = in.readLong();
- long lsb = in.readLong();
- this.token = new UUID(msb, lsb);
- } catch (Exception e) {
- throw new UnknownStoreVersionException(e);
- }
-
- if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) {
- throw new UnknownStoreVersionException(token.toString());
- }
- this.version = in.readInt();
- if (in.readBoolean()) {
- setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in));
- } else {
- setLastUpdateLocation(null);
- }
- this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong());
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong());
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong());
- this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
-
- LOG.info("Scheduler Store version {} loaded", this.version);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.token.getMostSignificantBits());
- out.writeLong(this.token.getLeastSignificantBits());
- out.writeInt(this.version);
- if (getLastUpdateLocation() != null) {
- out.writeBoolean(true);
- LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out);
- } else {
- out.writeBoolean(false);
- }
- out.writeLong(this.storedSchedulers.getPageId());
- out.writeLong(this.journalRC.getPageId());
- out.writeLong(this.removeLocationTracker.getPageId());
- }
-
- private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
- private final JobSchedulerStoreImpl store;
-
- JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- @Override
- public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
- JobSchedulerImpl result = new JobSchedulerImpl(this.store);
- result.read(dataIn);
- return result;
- }
-
- @Override
- public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
- js.write(dataOut);
- }
- }
-
- private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> {
-
- @Override
- public List<Integer> readPayload(DataInput dataIn) throws IOException {
- List<Integer> result = new ArrayList<Integer>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn));
- }
- return result;
- }
-
- @Override
- public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (Integer integer : value) {
- IntegerMarshaller.INSTANCE.writePayload(integer, dataOut);
- }
- }
- }
-}