You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/25 05:02:24 UTC
[2/5] activemq git commit: Revert
"https://issues.apache.org/jira/browse/AMQ-3758"
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 1a08931..5934914 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -19,10 +19,8 @@ package org.apache.activemq.store.kahadb.scheduler;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -30,917 +28,363 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
-import org.apache.activemq.store.kahadb.JournalCommand;
-import org.apache.activemq.store.kahadb.KahaDBMetaData;
-import org.apache.activemq.store.kahadb.Visitor;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
-import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
+import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
-
- private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this);
- private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
- private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
- private File legacyStoreArchiveDirectory;
-
- /**
- * The Scheduler Token is used to identify base revisions of the Scheduler store. A store
- * based on the initial scheduler design will not have this tag in it's meta-data and will
- * indicate an update is needed. Later versions of the scheduler can also change this value
- * to indicate incompatible store bases which require complete meta-data and journal rewrites
- * instead of simpler meta-data updates.
- */
- static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409");
-
- /**
- * The default scheduler store version. All new store instance will be given this version and
- * earlier versions will be updated to this version.
- */
- static final int CURRENT_VERSION = 1;
-
- @Override
- public JobScheduler getJobScheduler(final String name) throws Exception {
- this.indexLock.writeLock().lock();
- try {
- JobSchedulerImpl result = this.schedulers.get(name);
- if (result == null) {
- final JobSchedulerImpl js = new JobSchedulerImpl(this);
- js.setName(name);
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- js.createIndexes(tx);
- js.load(tx);
- metaData.getJobSchedulers().put(tx, name, js);
- }
- });
- result = js;
- this.schedulers.put(name, js);
- if (isStarted()) {
- result.start();
- }
- this.pageFile.flush();
- }
- return result;
- } finally {
- this.indexLock.writeLock().unlock();
+public class JobSchedulerStoreImpl extends LockableServiceSupport implements JobSchedulerStore {
+ static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
+ private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
+
+ public static final int CLOSED_STATE = 1;
+ public static final int OPEN_STATE = 2;
+
+ private File directory;
+ PageFile pageFile;
+ private Journal journal;
+ protected AtomicLong journalSize = new AtomicLong(0);
+ private boolean failIfDatabaseIsLocked;
+ private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+ private boolean enableIndexWriteAsync = false;
+ MetaData metaData = new MetaData(this);
+ final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
+ Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
+
+ protected class MetaData {
+ protected MetaData(JobSchedulerStoreImpl store) {
+ this.store = store;
}
- }
- @Override
- public boolean removeJobScheduler(final String name) throws Exception {
- boolean result = false;
+ private final JobSchedulerStoreImpl store;
+ Page<MetaData> page;
+ BTreeIndex<Integer, Integer> journalRC;
+ BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
- this.indexLock.writeLock().lock();
- try {
- final JobSchedulerImpl js = this.schedulers.remove(name);
- result = js != null;
- if (result) {
- js.stop();
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- metaData.getJobSchedulers().remove(tx, name);
- js.removeAll(tx);
- }
- });
- }
- } finally {
- this.indexLock.writeLock().unlock();
+ void createIndexes(Transaction tx) throws IOException {
+ this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, tx.allocate().getPageId());
+ this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
}
- return result;
- }
-
- /**
- * Sets the directory where the legacy scheduler store files are archived before an
- * update attempt is made. Both the legacy index files and the journal files are moved
- * to this folder prior to an upgrade attempt.
- *
- * @param directory
- * The directory to move the legacy Scheduler Store files to.
- */
- public void setLegacyStoreArchiveDirectory(File directory) {
- this.legacyStoreArchiveDirectory = directory;
- }
- /**
- * Gets the directory where the legacy Scheduler Store files will be archived if the
- * broker is started and an existing Job Scheduler Store from an old version is detected.
- *
- * @return the directory where scheduler store legacy files are archived on upgrade.
- */
- public File getLegacyStoreArchiveDirectory() {
- if (this.legacyStoreArchiveDirectory == null) {
- this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore");
+ void load(Transaction tx) throws IOException {
+ this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+ this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+ this.storedSchedulers.load(tx);
+ this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+ this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
+ this.journalRC.load(tx);
}
- return this.legacyStoreArchiveDirectory.getAbsoluteFile();
- }
-
- @Override
- public void load() throws IOException {
- if (opened.compareAndSet(false, true)) {
- getJournal().start();
- try {
- loadPageFile();
- } catch (UnknownStoreVersionException ex) {
- LOG.info("Can't start until store update is performed.");
- upgradeFromLegacy();
- // Restart with the updated store
- getJournal().start();
- loadPageFile();
- LOG.info("Update from legacy Scheduler store completed successfully.");
- } catch (Throwable t) {
- LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString());
- LOG.debug("Index load failure", t);
-
- // try to recover index
- try {
- pageFile.unload();
- } catch (Exception ignore) {
- }
- if (isArchiveCorruptedIndex()) {
- pageFile.archive();
- } else {
- pageFile.delete();
- }
- metaData = new JobSchedulerKahaDBMetaData(this);
- pageFile = null;
- loadPageFile();
+ void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
+ for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
+ Entry<String, JobSchedulerImpl> entry = i.next();
+ entry.getValue().load(tx);
+ schedulers.put(entry.getKey(), entry.getValue());
}
- startCheckpoint();
- recover();
}
- LOG.info("{} started.", this);
- }
-
- @Override
- public void unload() throws IOException {
- if (opened.compareAndSet(true, false)) {
- for (JobSchedulerImpl js : this.schedulers.values()) {
- try {
- js.stop();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- this.indexLock.writeLock().lock();
- try {
- if (pageFile != null && pageFile.isLoaded()) {
- metaData.setState(KahaDBMetaData.CLOSED_STATE);
-
- if (metaData.getPage() != null) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- tx.store(metaData.getPage(), metaDataMarshaller, true);
- }
- });
- }
- }
- } finally {
- this.indexLock.writeLock().unlock();
- }
- checkpointLock.writeLock().lock();
- try {
- if (metaData.getPage() != null) {
- checkpointUpdate(true);
- }
- } finally {
- checkpointLock.writeLock().unlock();
- }
- synchronized (checkpointThreadLock) {
- if (checkpointThread != null) {
- try {
- checkpointThread.join();
- checkpointThread = null;
- } catch (InterruptedException e) {
- }
- }
- }
-
- if (pageFile != null) {
- pageFile.unload();
- pageFile = null;
- }
- if (this.journal != null) {
- journal.close();
- journal = null;
- }
-
- metaData = new JobSchedulerKahaDBMetaData(this);
+ public void read(DataInput is) throws IOException {
+ this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(pageFile, is.readLong());
+ this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
+ this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
+ this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
+ this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+ this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
}
- LOG.info("{} stopped.", this);
- }
- private void loadPageFile() throws IOException {
- this.indexLock.writeLock().lock();
- try {
- final PageFile pageFile = getPageFile();
- pageFile.load();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- if (pageFile.getPageCount() == 0) {
- Page<JobSchedulerKahaDBMetaData> page = tx.allocate();
- assert page.getPageId() == 0;
- page.set(metaData);
- metaData.setPage(page);
- metaData.setState(KahaDBMetaData.CLOSED_STATE);
- metaData.initialize(tx);
- tx.store(metaData.getPage(), metaDataMarshaller, true);
- } else {
- Page<JobSchedulerKahaDBMetaData> page = null;
- page = tx.load(0, metaDataMarshaller);
- metaData = page.get();
- metaData.setPage(page);
- }
- metaData.load(tx);
- metaData.loadScheduler(tx, schedulers);
- for (JobSchedulerImpl js : schedulers.values()) {
- try {
- js.start();
- } catch (Exception e) {
- JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
- }
- }
- }
- });
-
- pageFile.flush();
- } finally {
- this.indexLock.writeLock().unlock();
+ public void write(DataOutput os) throws IOException {
+ os.writeLong(this.storedSchedulers.getPageId());
+ os.writeLong(this.journalRC.getPageId());
}
}
- private void upgradeFromLegacy() throws IOException {
-
- journal.close();
- journal = null;
- try {
- pageFile.unload();
- pageFile = null;
- } catch (Exception ignore) {}
-
- File storeDir = getDirectory().getAbsoluteFile();
- File storeArchiveDir = getLegacyStoreArchiveDirectory();
-
- LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir);
-
- // Move only the known store files, locks and other items left in place.
- IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() {
-
- @Override
- public boolean accept(File dir, String name) {
- if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
- return true;
- }
- return false;
- }
- });
-
- // We reset everything to clean state, then we can read from the old
- // scheduler store and replay the scheduled jobs into this one as adds.
- getJournal().start();
- metaData = new JobSchedulerKahaDBMetaData(this);
- pageFile = null;
- loadPageFile();
-
- LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory());
- replayer.load();
- replayer.startReplay(this);
-
- // Cleanup after replay and store what we've done.
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- tx.store(metaData.getPage(), metaDataMarshaller, true);
- }
- });
-
- checkpointUpdate(true);
- getJournal().close();
- getPageFile().unload();
- }
-
- @Override
- protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
- LOG.debug("Job Scheduler Store Checkpoint started.");
-
- // reflect last update exclusive of current checkpoint
- Location lastUpdate = metaData.getLastUpdateLocation();
- metaData.setState(KahaDBMetaData.OPEN_STATE);
- tx.store(metaData.getPage(), metaDataMarshaller, true);
- pageFile.flush();
-
- if (cleanup) {
- final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
- final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
-
- LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet);
-
- if (lastUpdate != null) {
- gcCandidateSet.remove(lastUpdate.getDataFileId());
- }
-
- this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() {
-
- @Override
- public void visit(List<Integer> keys, List<Integer> values) {
- for (Integer key : keys) {
- if (gcCandidateSet.remove(key)) {
- LOG.trace("Removed referenced file: {} from GC set", key);
- }
- }
- }
-
- @Override
- public boolean isInterestedInKeysBetween(Integer first, Integer second) {
- return true;
- }
- });
-
- LOG.trace("gc candidates after reference check: {}", gcCandidateSet);
-
- // If there are GC candidates then check the remove command location to see
- // if any of them can go or if they must stay in order to ensure proper recover.
- //
- // A log containing any remove commands must be kept until all the logs with the
- // add commands for all the removed jobs have been dropped.
- if (!gcCandidateSet.isEmpty()) {
- Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
- List<Integer> orphans = new ArrayList<Integer>();
- while (removals.hasNext()) {
- boolean orphanedRemve = true;
- Entry<Integer, List<Integer>> entry = removals.next();
-
- // If this log is not a GC candidate then there's no need to do a check to rule it out
- if (gcCandidateSet.contains(entry.getKey())) {
- for (Integer addLocation : entry.getValue()) {
- if (completeFileSet.contains(addLocation)) {
- orphanedRemve = false;
- break;
- }
- }
-
- // If it's not orphaned than we can't remove it, otherwise we
- // stop tracking it it's log will get deleted on the next check.
- if (!orphanedRemve) {
- LOG.trace("A remove in log {} has an add still in existance.", entry.getKey());
- gcCandidateSet.remove(entry.getKey());
- } else {
- LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());
- orphans.add(entry.getKey());
- }
- }
- }
-
- // Drop all orphaned removes from the tracker.
- for (Integer orphan : orphans) {
- metaData.getRemoveLocationTracker().remove(tx, orphan);
- }
- }
+ class MetaDataMarshaller extends VariableMarshaller<MetaData> {
+ private final JobSchedulerStoreImpl store;
- LOG.trace("gc candidates after removals check: {}", gcCandidateSet);
- if (!gcCandidateSet.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
- }
- journal.removeDataFiles(gcCandidateSet);
- }
+ MetaDataMarshaller(JobSchedulerStoreImpl store) {
+ this.store = store;
}
- LOG.debug("Job Scheduler Store Checkpoint complete.");
- }
+ @Override
+ public MetaData readPayload(DataInput dataIn) throws IOException {
+ MetaData rc = new MetaData(this.store);
+ rc.read(dataIn);
+ return rc;
+ }
- /**
- * Adds a reference for the journal log file pointed to by the given Location value.
- *
- * To prevent log files in the journal that still contain valid data that needs to be
- * kept in order to allow for recovery the logs must have active references. Each Job
- * scheduler should ensure that the logs are accurately referenced.
- *
- * @param tx
- * The TX under which the update is to be performed.
- * @param location
- * The location value to update the reference count of.
- *
- * @throws IOException if an error occurs while updating the journal references table.
- */
- protected void incrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- Integer val = metaData.getJournalRC().get(tx, logId);
- int refCount = val != null ? val.intValue() + 1 : 1;
- metaData.getJournalRC().put(tx, logId, refCount);
+ @Override
+ public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
+ object.write(dataOut);
+ }
}
- /**
- * Removes one reference for the Journal log file indicated in the given Location value.
- *
- * The references are used to track which log files cannot be GC'd. When the reference count
- * on a log file reaches zero the file id is removed from the tracker and the log will be
- * removed on the next check point update.
- *
- * @param tx
- * The TX under which the update is to be performed.
- * @param location
- * The location value to update the reference count of.
- *
- * @throws IOException if an error occurs while updating the journal references table.
- */
- protected void decrementJournalCount(Transaction tx, Location location) throws IOException {
- int logId = location.getDataFileId();
- Integer refCount = metaData.getJournalRC().get(tx, logId);
- if (refCount != null) {
- int refCountValue = refCount;
- refCountValue--;
- if (refCountValue <= 0) {
- metaData.getJournalRC().remove(tx, logId);
- } else {
- metaData.getJournalRC().put(tx, logId, refCountValue);
+ class ValueMarshaller extends VariableMarshaller<List<JobLocation>> {
+ @Override
+ public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
+ List<JobLocation> result = new ArrayList<JobLocation>();
+ int size = dataIn.readInt();
+ for (int i = 0; i < size; i++) {
+ JobLocation jobLocation = new JobLocation();
+ jobLocation.readExternal(dataIn);
+ result.add(jobLocation);
}
+ return result;
}
- }
- /**
- * Updates the Job removal tracking index with the location of a remove command and the
- * original JobLocation entry.
- *
- * The JobLocation holds the locations in the logs where the add and update commands for
- * a job stored. The log file containing the remove command can only be discarded after
- * both the add and latest update log files have also been discarded.
- *
- * @param tx
- * The TX under which the update is to be performed.
- * @param location
- * The location value to reference a remove command.
- * @param removedJob
- * The original JobLocation instance that holds the add and update locations
- *
- * @throws IOException if an error occurs while updating the remove location tracker.
- */
- protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException {
- int logId = location.getDataFileId();
- List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
- if (removed == null) {
- removed = new ArrayList<Integer>();
+ @Override
+ public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(value.size());
+ for (JobLocation jobLocation : value) {
+ jobLocation.writeExternal(dataOut);
+ }
}
- removed.add(removedJob.getLocation().getDataFileId());
- this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
- }
-
- /**
- * Retrieve the scheduled Job's byte blob from the journal.
- *
- * @param location
- * The location of the KahaAddScheduledJobCommand that originated the Job.
- *
- * @return a ByteSequence containing the payload of the scheduled Job.
- *
- * @throws IOException if an error occurs while reading the payload value.
- */
- protected ByteSequence getPayload(Location location) throws IOException {
- KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location);
- Buffer payload = job.getPayload();
- return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength());
- }
-
- public void readLockIndex() {
- this.indexLock.readLock().lock();
- }
-
- public void readUnlockIndex() {
- this.indexLock.readLock().unlock();
- }
-
- public void writeLockIndex() {
- this.indexLock.writeLock().lock();
- }
-
- public void writeUnlockIndex() {
- this.indexLock.writeLock().unlock();
- }
-
- @Override
- public String toString() {
- return "JobSchedulerStore: " + getDirectory();
- }
-
- @Override
- protected String getPageFileName() {
- return "scheduleDB";
- }
-
- @Override
- protected File getDefaultDataDirectory() {
- return new File(IOHelper.getDefaultDataDirectory(), "delayedDB");
}
- private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> {
-
+ class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
private final JobSchedulerStoreImpl store;
- MetaDataMarshaller(JobSchedulerStoreImpl store) {
+ JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
this.store = store;
}
@Override
- public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException {
- JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store);
- rc.read(dataIn);
- return rc;
+ public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
+ JobSchedulerImpl result = new JobSchedulerImpl(this.store);
+ result.read(dataIn);
+ return result;
}
@Override
- public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException {
- object.write(dataOut);
+ public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
+ js.write(dataOut);
}
}
- /**
- * Called during index recovery to rebuild the index from the last known good location. For
- * entries that occur before the last known good position we just ignore then and move on.
- *
- * @param command
- * the command read from the Journal which should be used to update the index.
- * @param location
- * the location in the index where the command was read.
- * @param inDoubtlocation
- * the location in the index known to be the last time the index was valid.
- *
- * @throws IOException if an error occurs while recovering the index.
- */
- protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
- if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
- process(data, location);
- }
+ @Override
+ public File getDirectory() {
+ return directory;
}
- /**
- * Called during recovery to allow the store to rebuild from scratch.
- *
- * @param data
- * The command to process, which was read from the Journal.
- * @param location
- * The location of the command in the Journal.
- *
- * @throws IOException if an error occurs during command processing.
- */
@Override
- protected void process(JournalCommand<?> data, final Location location) throws IOException {
- data.visit(new Visitor() {
- @Override
- public void visit(final KahaAddScheduledJobCommand command) throws IOException {
- final JobSchedulerImpl scheduler;
-
- indexLock.writeLock().lock();
- try {
- try {
- scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
- } catch (Exception e) {
- throw new IOException(e);
- }
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- scheduler.process(tx, command, location);
- }
- });
-
- processLocation(location);
- } finally {
- indexLock.writeLock().unlock();
- }
- }
-
- @Override
- public void visit(final KahaRemoveScheduledJobCommand command) throws IOException {
- final JobSchedulerImpl scheduler;
-
- indexLock.writeLock().lock();
- try {
- try {
- scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
- } catch (Exception e) {
- throw new IOException(e);
- }
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- scheduler.process(tx, command, location);
- }
- });
-
- processLocation(location);
- } finally {
- indexLock.writeLock().unlock();
- }
- }
-
- @Override
- public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException {
- final JobSchedulerImpl scheduler;
-
- indexLock.writeLock().lock();
- try {
- try {
- scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
- } catch (Exception e) {
- throw new IOException(e);
- }
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- scheduler.process(tx, command, location);
- }
- });
-
- processLocation(location);
- } finally {
- indexLock.writeLock().unlock();
- }
- }
-
- @Override
- public void visit(final KahaRescheduleJobCommand command) throws IOException {
- final JobSchedulerImpl scheduler;
-
- indexLock.writeLock().lock();
- try {
- try {
- scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
- } catch (Exception e) {
- throw new IOException(e);
- }
- getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- scheduler.process(tx, command, location);
- }
- });
-
- processLocation(location);
- } finally {
- indexLock.writeLock().unlock();
- }
- }
-
- @Override
- public void visit(final KahaDestroySchedulerCommand command) {
- try {
- removeJobScheduler(command.getScheduler());
- } catch (Exception e) {
- LOG.warn("Failed to remove scheduler: {}", command.getScheduler());
- }
-
- processLocation(location);
- }
-
- @Override
- public void visit(KahaTraceCommand command) {
- processLocation(location);
- }
- });
+ public void setDirectory(File directory) {
+ this.directory = directory;
}
- protected void processLocation(final Location location) {
- indexLock.writeLock().lock();
+ @Override
+ public long size() {
+ if (!isStarted()) {
+ return 0;
+ }
try {
- this.metaData.setLastUpdateLocation(location);
- } finally {
- indexLock.writeLock().unlock();
+ return journalSize.get() + pageFile.getDiskSize();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
- /**
- * We recover from the Journal logs as needed to restore the index.
- *
- * @throws IllegalStateException
- * @throws IOException
- */
- private void recover() throws IllegalStateException, IOException {
- this.indexLock.writeLock().lock();
- try {
- long start = System.currentTimeMillis();
- Location lastIndoubtPosition = getRecoveryPosition();
- Location recoveryPosition = lastIndoubtPosition;
-
- if (recoveryPosition != null) {
- int redoCounter = 0;
- LOG.info("Recovering from the journal ...");
- while (recoveryPosition != null) {
- JournalCommand<?> message = load(recoveryPosition);
- metaData.setLastUpdateLocation(recoveryPosition);
- doRecover(message, recoveryPosition, lastIndoubtPosition);
- redoCounter++;
- recoveryPosition = journal.getNextLocation(recoveryPosition);
- if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
- LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter);
- }
+ @Override
+ public JobScheduler getJobScheduler(final String name) throws Exception {
+ JobSchedulerImpl result = this.schedulers.get(name);
+ if (result == null) {
+ final JobSchedulerImpl js = new JobSchedulerImpl(this);
+ js.setName(name);
+ getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ js.createIndexes(tx);
+ js.load(tx);
+ metaData.storedSchedulers.put(tx, name, js);
}
- long end = System.currentTimeMillis();
- LOG.info("Recovery replayed {} operations from the journal in {} seconds.",
- redoCounter, ((end - start) / 1000.0f));
+ });
+ result = js;
+ this.schedulers.put(name, js);
+ if (isStarted()) {
+ result.start();
}
+ this.pageFile.flush();
+ }
+ return result;
+ }
- // We may have to undo some index updates.
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ synchronized public boolean removeJobScheduler(final String name) throws Exception {
+ boolean result = false;
+ final JobSchedulerImpl js = this.schedulers.remove(name);
+ result = js != null;
+ if (result) {
+ js.stop();
+ getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
- recoverIndex(tx);
+ metaData.storedSchedulers.remove(tx, name);
+ js.destroy(tx);
}
});
-
- } finally {
- this.indexLock.writeLock().unlock();
}
+ return result;
}
- private Location getRecoveryPosition() throws IOException {
- // This loads the first position and we completely rebuild the index if we
- // do not override it with some known recovery start location.
- Location result = null;
-
- if (!isForceRecoverIndex()) {
- if (metaData.getLastUpdateLocation() != null) {
- result = metaData.getLastUpdateLocation();
- }
+ @Override
+ protected synchronized void doStart() throws Exception {
+ if (this.directory == null) {
+ this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
}
+ IOHelper.mkdirs(this.directory);
+ this.journal = new Journal();
+ this.journal.setDirectory(directory);
+ this.journal.setMaxFileLength(getJournalMaxFileLength());
+ this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
+ this.journal.setSizeAccumulator(this.journalSize);
+ this.journal.start();
+ this.pageFile = new PageFile(directory, "scheduleDB");
+ this.pageFile.setWriteBatchSize(1);
+ this.pageFile.load();
+
+ this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ if (pageFile.getPageCount() == 0) {
+ Page<MetaData> page = tx.allocate();
+ assert page.getPageId() == 0;
+ page.set(metaData);
+ metaData.page = page;
+ metaData.createIndexes(tx);
+ tx.store(metaData.page, metaDataMarshaller, true);
- return journal.getNextLocation(result);
- }
-
- private void recoverIndex(Transaction tx) throws IOException {
- long start = System.currentTimeMillis();
-
- // It is possible index updates got applied before the journal updates..
- // in that case we need to removed references to Jobs that are not in the journal
- final Location lastAppendLocation = journal.getLastAppendLocation();
- long undoCounter = 0;
-
- // Go through all the jobs in each scheduler and check if any are added after
- // the last appended location and remove those. For now we ignore the update
- // location since the scheduled job will update itself after the next fire and
- // a new update will replace any existing update.
- for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
- Map.Entry<String, JobSchedulerImpl> entry = i.next();
- JobSchedulerImpl scheduler = entry.getValue();
-
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
- if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
- if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
- LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
- undoCounter++;
+ } else {
+ Page<MetaData> page = tx.load(0, metaDataMarshaller);
+ metaData = page.get();
+ metaData.page = page;
+ }
+ metaData.load(tx);
+ metaData.loadScheduler(tx, schedulers);
+ for (JobSchedulerImpl js : schedulers.values()) {
+ try {
+ js.start();
+ } catch (Exception e) {
+ JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
}
}
}
- }
-
- if (undoCounter > 0) {
- // The rolled back operations are basically in flight journal writes. To avoid getting
- // these the end user should do sync writes to the journal.
- long end = System.currentTimeMillis();
- LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
- undoCounter = 0;
- }
-
- // Now we check for missing and corrupt journal files.
+ });
- // 1. Collect the set of all referenced journal files based on the Location of the
- // the scheduled jobs and the marked last update field.
- HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
- for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
- Map.Entry<String, JobSchedulerImpl> entry = i.next();
- JobSchedulerImpl scheduler = entry.getValue();
+ this.pageFile.flush();
+ LOG.info(this + " started");
+ }
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
- missingJournalFiles.add(job.getLocation().getDataFileId());
- if (job.getLastUpdate() != null) {
- missingJournalFiles.add(job.getLastUpdate().getDataFileId());
- }
- }
+ @Override
+ protected synchronized void doStop(ServiceStopper stopper) throws Exception {
+ for (JobSchedulerImpl js : this.schedulers.values()) {
+ js.stop();
}
-
- // 2. Remove from that set all known data file Id's in the journal and what's left
- // is the missing set which will soon also contain the corrupted set.
- missingJournalFiles.removeAll(journal.getFileMap().keySet());
- if (!missingJournalFiles.isEmpty()) {
- LOG.info("Some journal files are missing: {}", missingJournalFiles);
+ if (this.pageFile != null) {
+ this.pageFile.unload();
}
+ if (this.journal != null) {
+ journal.close();
+ }
+ LOG.info(this + " stopped");
+ }
- // 3. Now check all references in the journal logs for corruption and add any
- // corrupt journal files to the missing set.
- HashSet<Location> corruptedLocations = new HashSet<Location>();
-
- if (isCheckForCorruptJournalFiles()) {
- Collection<DataFile> dataFiles = journal.getFileMap().values();
- for (DataFile dataFile : dataFiles) {
- int id = dataFile.getDataFileId();
- for (long offset : dataFile.getCorruptedBlocks()) {
- corruptedLocations.add(new Location(id, (int) offset));
- }
- }
+ synchronized void incrementJournalCount(Transaction tx, Location location) throws IOException {
+ int logId = location.getDataFileId();
+ Integer val = this.metaData.journalRC.get(tx, logId);
+ int refCount = val != null ? val.intValue() + 1 : 1;
+ this.metaData.journalRC.put(tx, logId, refCount);
+ }
- if (!corruptedLocations.isEmpty()) {
- LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size());
- }
+ synchronized void decrementJournalCount(Transaction tx, Location location) throws IOException {
+ int logId = location.getDataFileId();
+ int refCount = this.metaData.journalRC.get(tx, logId);
+ refCount--;
+ if (refCount <= 0) {
+ this.metaData.journalRC.remove(tx, logId);
+ Set<Integer> set = new HashSet<Integer>();
+ set.add(logId);
+ this.journal.removeDataFiles(set);
+ } else {
+ this.metaData.journalRC.put(tx, logId, refCount);
}
+ }
- // 4. Now we either fail or we remove all references to missing or corrupt journal
- // files from the various JobSchedulerImpl instances. We only remove the Job if
- // the initial Add operation is missing when the ignore option is set, the updates
- // could be lost but that's price you pay when ignoring the missing logs.
- if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) {
- if (!isIgnoreMissingJournalfiles()) {
- throw new IOException("Detected missing/corrupt journal files.");
- }
+ synchronized ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
+ ByteSequence result = null;
+ result = this.journal.read(location);
+ return result;
+ }
- // Remove all Jobs that reference an Location that is either missing or corrupt.
- undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations);
+ synchronized Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
+ return this.journal.write(payload, sync);
+ }
- // Clean up the Journal Reference count Map.
- removeJournalRCForMissingFiles(tx, missingJournalFiles);
- }
+ PageFile getPageFile() {
+ this.pageFile.isLoaded();
+ return this.pageFile;
+ }
- if (undoCounter > 0) {
- long end = System.currentTimeMillis();
- LOG.info("Detected missing/corrupt journal files. Dropped {} jobs from the " +
- "index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
- }
+ public boolean isFailIfDatabaseIsLocked() {
+ return failIfDatabaseIsLocked;
}
- private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException {
- List<Integer> matches = new ArrayList<Integer>();
+ public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+ this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+ }
- Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx);
- while (references.hasNext()) {
- int dataFileId = references.next().getKey();
- if (missing.contains(dataFileId)) {
- matches.add(dataFileId);
- }
- }
+ public int getJournalMaxFileLength() {
+ return journalMaxFileLength;
+ }
- for (Integer match : matches) {
- metaData.getJournalRC().remove(tx, match);
- }
+ public void setJournalMaxFileLength(int journalMaxFileLength) {
+ this.journalMaxFileLength = journalMaxFileLength;
}
- private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException {
- int removed = 0;
+ public int getJournalMaxWriteBatchSize() {
+ return journalMaxWriteBatchSize;
+ }
- // Remove Jobs that reference missing or corrupt files.
- // Remove Reference counts to missing or corrupt files.
- // Remove and remove command markers to missing or corrupt files.
- for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
- Map.Entry<String, JobSchedulerImpl> entry = i.next();
- JobSchedulerImpl scheduler = entry.getValue();
+ public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+ this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+ }
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ public boolean isEnableIndexWriteAsync() {
+ return enableIndexWriteAsync;
+ }
- // Remove all jobs in missing log files.
- if (missing.contains(job.getLocation().getDataFileId())) {
- scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
- removed++;
- continue;
- }
+ public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+ this.enableIndexWriteAsync = enableIndexWriteAsync;
+ }
- // Remove all jobs in corrupted parts of log files.
- if (corrupted.contains(job.getLocation())) {
- scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
- removed++;
- }
- }
- }
+ @Override
+ public String toString() {
+ return "JobSchedulerStore:" + this.directory;
+ }
- return removed;
+ @Override
+ public Locker createDefaultLocker() throws IOException {
+ SharedFileLocker locker = new SharedFileLocker();
+ locker.setDirectory(this.getDirectory());
+ return locker;
+ }
+
+ @Override
+ public void init() throws Exception {
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
deleted file mode 100644
index 5146d84..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/UnknownStoreVersionException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.activemq.store.kahadb.scheduler;
-
-import java.io.IOException;
-
-public class UnknownStoreVersionException extends IOException {
-
- private static final long serialVersionUID = -8544753506151157145L;
-
- private final String token;
-
- public UnknownStoreVersionException(Throwable cause) {
- super(cause);
- this.token = "";
- }
-
- public UnknownStoreVersionException(String token) {
- super("Failed to load Store, found unknown store token: " + token);
- this.token = token;
- }
-
- public String getToken() {
- return this.token;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
deleted file mode 100644
index 2562f50..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.util.ByteSequence;
-
-/**
- * Legacy version Job and Job payload wrapper. Allows for easy replay of stored
- * legacy jobs into a new JobSchedulerStoreImpl intsance.
- */
-final class LegacyJobImpl {
-
- private final LegacyJobLocation jobLocation;
- private final Buffer payload;
-
- protected LegacyJobImpl(LegacyJobLocation location, ByteSequence payload) {
- this.jobLocation = location;
- this.payload = new Buffer(payload.data, payload.offset, payload.length);
- }
-
- public String getJobId() {
- return this.jobLocation.getJobId();
- }
-
- public Buffer getPayload() {
- return this.payload;
- }
-
- public long getPeriod() {
- return this.jobLocation.getPeriod();
- }
-
- public int getRepeat() {
- return this.jobLocation.getRepeat();
- }
-
- public long getDelay() {
- return this.jobLocation.getDelay();
- }
-
- public String getCronEntry() {
- return this.jobLocation.getCronEntry();
- }
-
- public long getNextExecutionTime() {
- return this.jobLocation.getNextTime();
- }
-
- public long getStartTime() {
- return this.jobLocation.getStartTime();
- }
-
- @Override
- public String toString() {
- return this.jobLocation.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
deleted file mode 100644
index 8437064..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobLocation.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-
-final class LegacyJobLocation {
-
- private String jobId;
- private int repeat;
- private long startTime;
- private long delay;
- private long nextTime;
- private long period;
- private String cronEntry;
- private final Location location;
-
- public LegacyJobLocation(Location location) {
- this.location = location;
- }
-
- public LegacyJobLocation() {
- this(new Location());
- }
-
- public void readExternal(DataInput in) throws IOException {
- this.jobId = in.readUTF();
- this.repeat = in.readInt();
- this.startTime = in.readLong();
- this.delay = in.readLong();
- this.nextTime = in.readLong();
- this.period = in.readLong();
- this.cronEntry = in.readUTF();
- this.location.readExternal(in);
- }
-
- public void writeExternal(DataOutput out) throws IOException {
- out.writeUTF(this.jobId);
- out.writeInt(this.repeat);
- out.writeLong(this.startTime);
- out.writeLong(this.delay);
- out.writeLong(this.nextTime);
- out.writeLong(this.period);
- if (this.cronEntry == null) {
- this.cronEntry = "";
- }
- out.writeUTF(this.cronEntry);
- this.location.writeExternal(out);
- }
-
- /**
- * @return the jobId
- */
- public String getJobId() {
- return this.jobId;
- }
-
- /**
- * @param jobId
- * the jobId to set
- */
- public void setJobId(String jobId) {
- this.jobId = jobId;
- }
-
- /**
- * @return the repeat
- */
- public int getRepeat() {
- return this.repeat;
- }
-
- /**
- * @param repeat
- * the repeat to set
- */
- public void setRepeat(int repeat) {
- this.repeat = repeat;
- }
-
- /**
- * @return the start
- */
- public long getStartTime() {
- return this.startTime;
- }
-
- /**
- * @param start
- * the start to set
- */
- public void setStartTime(long start) {
- this.startTime = start;
- }
-
- /**
- * @return the nextTime
- */
- public synchronized long getNextTime() {
- return this.nextTime;
- }
-
- /**
- * @param nextTime
- * the nextTime to set
- */
- public synchronized void setNextTime(long nextTime) {
- this.nextTime = nextTime;
- }
-
- /**
- * @return the period
- */
- public long getPeriod() {
- return this.period;
- }
-
- /**
- * @param period
- * the period to set
- */
- public void setPeriod(long period) {
- this.period = period;
- }
-
- /**
- * @return the cronEntry
- */
- public synchronized String getCronEntry() {
- return this.cronEntry;
- }
-
- /**
- * @param cronEntry
- * the cronEntry to set
- */
- public synchronized void setCronEntry(String cronEntry) {
- this.cronEntry = cronEntry;
- }
-
- /**
- * @return if this JobLocation represents a cron entry.
- */
- public boolean isCron() {
- return getCronEntry() != null && getCronEntry().length() > 0;
- }
-
- /**
- * @return the delay
- */
- public long getDelay() {
- return this.delay;
- }
-
- /**
- * @param delay
- * the delay to set
- */
- public void setDelay(long delay) {
- this.delay = delay;
- }
-
- /**
- * @return the location
- */
- public Location getLocation() {
- return this.location;
- }
-
- @Override
- public String toString() {
- return "Job [id=" + jobId + ", startTime=" + new Date(startTime) +
- ", delay=" + delay + ", period=" + period +
- ", repeat=" + repeat + ", nextTime=" + new Date(nextTime) + "]";
- }
-
- static class JobLocationMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
- static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
-
- @Override
- public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
- List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- LegacyJobLocation jobLocation = new LegacyJobLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- @Override
- public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (LegacyJobLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode());
- result = prime * result + (int) (delay ^ (delay >>> 32));
- result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
- result = prime * result + ((location == null) ? 0 : location.hashCode());
- result = prime * result + (int) (nextTime ^ (nextTime >>> 32));
- result = prime * result + (int) (period ^ (period >>> 32));
- result = prime * result + repeat;
- result = prime * result + (int) (startTime ^ (startTime >>> 32));
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj == null) {
- return false;
- }
-
- if (getClass() != obj.getClass()) {
- return false;
- }
-
- LegacyJobLocation other = (LegacyJobLocation) obj;
-
- if (cronEntry == null) {
- if (other.cronEntry != null) {
- return false;
- }
- } else if (!cronEntry.equals(other.cronEntry)) {
- return false;
- }
-
- if (delay != other.delay) {
- return false;
- }
-
- if (jobId == null) {
- if (other.jobId != null)
- return false;
- } else if (!jobId.equals(other.jobId)) {
- return false;
- }
-
- if (location == null) {
- if (other.location != null) {
- return false;
- }
- } else if (!location.equals(other.location)) {
- return false;
- }
-
- if (nextTime != other.nextTime) {
- return false;
- }
- if (period != other.period) {
- return false;
- }
- if (repeat != other.repeat) {
- return false;
- }
- if (startTime != other.startTime) {
- return false;
- }
-
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
deleted file mode 100644
index 687ffd7..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-
-/**
- * Read-only view of a stored legacy JobScheduler instance.
- */
-final class LegacyJobSchedulerImpl extends ServiceSupport {
-
- private final LegacyJobSchedulerStoreImpl store;
- private String name;
- private BTreeIndex<Long, List<LegacyJobLocation>> index;
-
- LegacyJobSchedulerImpl(LegacyJobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getName() {
- return this.name;
- }
-
- /**
- * Returns the next time that a job would be scheduled to run.
- *
- * @return time of next scheduled job to run.
- *
- * @throws IOException if an error occurs while fetching the time.
- */
- public long getNextScheduleTime() throws IOException {
- Map.Entry<Long, List<LegacyJobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
- return first != null ? first.getKey() : -1l;
- }
-
- /**
- * Gets the list of the next batch of scheduled jobs in the store.
- *
- * @return a list of the next jobs that will run.
- *
- * @throws IOException if an error occurs while fetching the jobs list.
- */
- public List<LegacyJobImpl> getNextScheduleJobs() throws IOException {
- final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
-
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Map.Entry<Long, List<LegacyJobLocation>> first = index.getFirst(store.getPageFile().tx());
- if (first != null) {
- for (LegacyJobLocation jl : first.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- LegacyJobImpl job = new LegacyJobImpl(jl, bs);
- result.add(job);
- }
- }
- }
- });
- return result;
- }
-
- /**
- * Gets a list of all scheduled jobs in this store.
- *
- * @return a list of all the currently scheduled jobs in this store.
- *
- * @throws IOException if an error occurs while fetching the list of jobs.
- */
- public List<LegacyJobImpl> getAllJobs() throws IOException {
- final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx());
- while (iter.hasNext()) {
- Map.Entry<Long, List<LegacyJobLocation>> next = iter.next();
- if (next != null) {
- for (LegacyJobLocation jl : next.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- LegacyJobImpl job = new LegacyJobImpl(jl, bs);
- result.add(job);
- }
- } else {
- break;
- }
- }
- }
- });
- return result;
- }
-
- /**
- * Gets a list of all scheduled jobs that exist between the given start and end time.
- *
- * @param start
- * The start time to look for scheduled jobs.
- * @param finish
- * The end time to stop looking for scheduled jobs.
- *
- * @return a list of all scheduled jobs that would run between the given start and end time.
- *
- * @throws IOException if an error occurs while fetching the list of jobs.
- */
- public List<LegacyJobImpl> getAllJobs(final long start, final long finish) throws IOException {
- final List<LegacyJobImpl> result = new ArrayList<LegacyJobImpl>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- Iterator<Map.Entry<Long, List<LegacyJobLocation>>> iter = index.iterator(store.getPageFile().tx(), start);
- while (iter.hasNext()) {
- Map.Entry<Long, List<LegacyJobLocation>> next = iter.next();
- if (next != null && next.getKey().longValue() <= finish) {
- for (LegacyJobLocation jl : next.getValue()) {
- ByteSequence bs = getPayload(jl.getLocation());
- LegacyJobImpl job = new LegacyJobImpl(jl, bs);
- result.add(job);
- }
- } else {
- break;
- }
- }
- }
- });
- return result;
- }
-
- ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
- return this.store.getPayload(location);
- }
-
- @Override
- public String toString() {
- return "LegacyJobScheduler: " + this.name;
- }
-
- @Override
- protected void doStart() throws Exception {
- }
-
- @Override
- protected void doStop(ServiceStopper stopper) throws Exception {
- }
-
- void createIndexes(Transaction tx) throws IOException {
- this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), tx.allocate().getPageId());
- }
-
- void load(Transaction tx) throws IOException {
- this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
- this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
- this.index.load(tx);
- }
-
- void read(DataInput in) throws IOException {
- this.name = in.readUTF();
- this.index = new BTreeIndex<Long, List<LegacyJobLocation>>(this.store.getPageFile(), in.readLong());
- this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
- this.index.setValueMarshaller(ValueMarshaller.INSTANCE);
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeUTF(name);
- out.writeLong(this.index.getPageId());
- }
-
- static class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
- static ValueMarshaller INSTANCE = new ValueMarshaller();
-
- @Override
- public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
- List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- LegacyJobLocation jobLocation = new LegacyJobLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- @Override
- public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (LegacyJobLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
deleted file mode 100644
index acbd4e7..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/legacy/LegacyJobSchedulerStoreImpl.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb.scheduler.legacy;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
-import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.LockFile;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Read-only view of a legacy JobSchedulerStore implementation.
- */
-final class LegacyJobSchedulerStoreImpl extends ServiceSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(LegacyJobSchedulerStoreImpl.class);
-
- private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
-
- private File directory;
- private PageFile pageFile;
- private Journal journal;
- private LockFile lockFile;
- private final AtomicLong journalSize = new AtomicLong(0);
- private boolean failIfDatabaseIsLocked;
- private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
- private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
- private boolean enableIndexWriteAsync = false;
- private MetaData metaData = new MetaData(this);
- private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
- private final Map<String, LegacyJobSchedulerImpl> schedulers = new HashMap<String, LegacyJobSchedulerImpl>();
-
- protected class MetaData {
- protected MetaData(LegacyJobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- private final LegacyJobSchedulerStoreImpl store;
- Page<MetaData> page;
- BTreeIndex<Integer, Integer> journalRC;
- BTreeIndex<String, LegacyJobSchedulerImpl> storedSchedulers;
-
- void createIndexes(Transaction tx) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, tx.allocate().getPageId());
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, tx.allocate().getPageId());
- }
-
- void load(Transaction tx) throws IOException {
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.storedSchedulers.load(tx);
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.load(tx);
- }
-
- void loadScheduler(Transaction tx, Map<String, LegacyJobSchedulerImpl> schedulers) throws IOException {
- for (Iterator<Entry<String, LegacyJobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
- Entry<String, LegacyJobSchedulerImpl> entry = i.next();
- entry.getValue().load(tx);
- schedulers.put(entry.getKey(), entry.getValue());
- }
- }
-
- public void read(DataInput is) throws IOException {
- this.storedSchedulers = new BTreeIndex<String, LegacyJobSchedulerImpl>(pageFile, is.readLong());
- this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
- this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
- this.journalRC = new BTreeIndex<Integer, Integer>(pageFile, is.readLong());
- this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
- }
-
- public void write(DataOutput os) throws IOException {
- os.writeLong(this.storedSchedulers.getPageId());
- os.writeLong(this.journalRC.getPageId());
- }
- }
-
- class MetaDataMarshaller extends VariableMarshaller<MetaData> {
- private final LegacyJobSchedulerStoreImpl store;
-
- MetaDataMarshaller(LegacyJobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- @Override
- public MetaData readPayload(DataInput dataIn) throws IOException {
- MetaData rc = new MetaData(this.store);
- rc.read(dataIn);
- return rc;
- }
-
- @Override
- public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
- object.write(dataOut);
- }
- }
-
- class ValueMarshaller extends VariableMarshaller<List<LegacyJobLocation>> {
- @Override
- public List<LegacyJobLocation> readPayload(DataInput dataIn) throws IOException {
- List<LegacyJobLocation> result = new ArrayList<LegacyJobLocation>();
- int size = dataIn.readInt();
- for (int i = 0; i < size; i++) {
- LegacyJobLocation jobLocation = new LegacyJobLocation();
- jobLocation.readExternal(dataIn);
- result.add(jobLocation);
- }
- return result;
- }
-
- @Override
- public void writePayload(List<LegacyJobLocation> value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.size());
- for (LegacyJobLocation jobLocation : value) {
- jobLocation.writeExternal(dataOut);
- }
- }
- }
-
- class JobSchedulerMarshaller extends VariableMarshaller<LegacyJobSchedulerImpl> {
- private final LegacyJobSchedulerStoreImpl store;
-
- JobSchedulerMarshaller(LegacyJobSchedulerStoreImpl store) {
- this.store = store;
- }
-
- @Override
- public LegacyJobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
- LegacyJobSchedulerImpl result = new LegacyJobSchedulerImpl(this.store);
- result.read(dataIn);
- return result;
- }
-
- @Override
- public void writePayload(LegacyJobSchedulerImpl js, DataOutput dataOut) throws IOException {
- js.write(dataOut);
- }
- }
-
- public File getDirectory() {
- return directory;
- }
-
- public void setDirectory(File directory) {
- this.directory = directory;
- }
-
- public long size() {
- if (!isStarted()) {
- return 0;
- }
- try {
- return journalSize.get() + pageFile.getDiskSize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Returns the named Job Scheduler if it exists, otherwise throws an exception.
- *
- * @param name
- * The name of the scheduler that is to be returned.
- *
- * @return the named scheduler if it exists.
- *
- * @throws Exception if the named scheduler does not exist in this store.
- */
- public LegacyJobSchedulerImpl getJobScheduler(final String name) throws Exception {
- LegacyJobSchedulerImpl result = this.schedulers.get(name);
- if (result == null) {
- throw new NoSuchElementException("No such Job Scheduler in this store: " + name);
- }
- return result;
- }
-
- /**
- * Returns the names of all the schedulers that exist in this scheduler store.
- *
- * @return a set of names of all scheduler instances in this store.
- *
- * @throws Exception if an error occurs while collecting the scheduler names.
- */
- public Set<String> getJobSchedulerNames() throws Exception {
- Set<String> names = Collections.emptySet();
-
- if (!schedulers.isEmpty()) {
- return this.schedulers.keySet();
- }
-
- return names;
- }
-
- @Override
- protected void doStart() throws Exception {
- if (this.directory == null) {
- this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
- }
- IOHelper.mkdirs(this.directory);
- lock();
- this.journal = new Journal();
- this.journal.setDirectory(directory);
- this.journal.setMaxFileLength(getJournalMaxFileLength());
- this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
- this.journal.setSizeAccumulator(this.journalSize);
- this.journal.start();
- this.pageFile = new PageFile(directory, "scheduleDB");
- this.pageFile.setWriteBatchSize(1);
- this.pageFile.load();
-
- this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- if (pageFile.getPageCount() == 0) {
- Page<MetaData> page = tx.allocate();
- assert page.getPageId() == 0;
- page.set(metaData);
- metaData.page = page;
- metaData.createIndexes(tx);
- tx.store(metaData.page, metaDataMarshaller, true);
-
- } else {
- Page<MetaData> page = tx.load(0, metaDataMarshaller);
- metaData = page.get();
- metaData.page = page;
- }
- metaData.load(tx);
- metaData.loadScheduler(tx, schedulers);
- for (LegacyJobSchedulerImpl js : schedulers.values()) {
- try {
- js.start();
- } catch (Exception e) {
- LegacyJobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
- }
- }
- }
- });
-
- this.pageFile.flush();
- LOG.info(this + " started");
- }
-
- @Override
- protected void doStop(ServiceStopper stopper) throws Exception {
- for (LegacyJobSchedulerImpl js : this.schedulers.values()) {
- js.stop();
- }
- if (this.pageFile != null) {
- this.pageFile.unload();
- }
- if (this.journal != null) {
- journal.close();
- }
- if (this.lockFile != null) {
- this.lockFile.unlock();
- }
- this.lockFile = null;
- LOG.info(this + " stopped");
- }
-
- ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
- ByteSequence result = null;
- result = this.journal.read(location);
- return result;
- }
-
- Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
- return this.journal.write(payload, sync);
- }
-
- private void lock() throws IOException {
- if (lockFile == null) {
- File lockFileName = new File(directory, "lock");
- lockFile = new LockFile(lockFileName, true);
- if (failIfDatabaseIsLocked) {
- lockFile.lock();
- } else {
- while (true) {
- try {
- lockFile.lock();
- break;
- } catch (IOException e) {
- LOG.info("Database " + lockFileName + " is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000)
- + " seconds for the database to be unlocked. Reason: " + e);
- try {
- Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
- } catch (InterruptedException e1) {
- }
- }
- }
- }
- }
- }
-
- PageFile getPageFile() {
- this.pageFile.isLoaded();
- return this.pageFile;
- }
-
- public boolean isFailIfDatabaseIsLocked() {
- return failIfDatabaseIsLocked;
- }
-
- public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
- this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
- }
-
- public int getJournalMaxFileLength() {
- return journalMaxFileLength;
- }
-
- public void setJournalMaxFileLength(int journalMaxFileLength) {
- this.journalMaxFileLength = journalMaxFileLength;
- }
-
- public int getJournalMaxWriteBatchSize() {
- return journalMaxWriteBatchSize;
- }
-
- public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
- this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
- }
-
- public boolean isEnableIndexWriteAsync() {
- return enableIndexWriteAsync;
- }
-
- public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
- this.enableIndexWriteAsync = enableIndexWriteAsync;
- }
-
- @Override
- public String toString() {
- return "LegacyJobSchedulerStore:" + this.directory;
- }
-}