You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/16 23:10:03 UTC
[15/16] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-3758
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
new file mode 100644
index 0000000..6003c87
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.kahadb.data.KahaEntryType;
+import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ServiceStopper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractKahaDBStore extends LockableServiceSupport {
+
+ static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
+
+ public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
+ public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
+
+ protected File directory;
+ protected PageFile pageFile;
+ protected Journal journal;
+ protected AtomicLong journalSize = new AtomicLong(0);
+ protected boolean failIfDatabaseIsLocked;
+ protected long checkpointInterval = 5*1000;
+ protected long cleanupInterval = 30*1000;
+ protected boolean checkForCorruptJournalFiles = false;
+ protected boolean checksumJournalFiles = true;
+ protected boolean forceRecoverIndex = false;
+ protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
+ protected boolean archiveCorruptedIndex = false;
+ protected boolean enableIndexWriteAsync = false;
+ protected boolean enableJournalDiskSyncs = false;
+ protected boolean deleteAllJobs = false;
+ protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
+ protected boolean useIndexLFRUEviction = false;
+ protected float indexLFUEvictionFactor = 0.2f;
+ protected boolean ignoreMissingJournalfiles = false;
+ protected int indexCacheSize = 1000;
+ protected boolean enableIndexDiskSyncs = true;
+ protected boolean enableIndexRecoveryFile = true;
+ protected boolean enableIndexPageCaching = true;
+ protected boolean archiveDataLogs;
+ protected boolean purgeStoreOnStartup;
+ protected File directoryArchive;
+
+ protected AtomicBoolean opened = new AtomicBoolean();
+ protected Thread checkpointThread;
+ protected final Object checkpointThreadLock = new Object();
+ protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+ protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
+
+ /**
+ * @return the name to give this store's PageFile instance.
+ */
+ protected abstract String getPageFileName();
+
+ /**
+ * @return the location of the data directory if no set by configuration.
+ */
+ protected abstract File getDefaultDataDirectory();
+
+ /**
+ * Loads the store from disk.
+ *
+ * Based on configuration this method can either load an existing store or it can purge
+ * an existing store and start in a clean state.
+ *
+ * @throws IOException if an error occurs during the load.
+ */
+ public abstract void load() throws IOException;
+
+ /**
+ * Unload the state of the Store to disk and shuts down all resources assigned to this
+ * KahaDB store implementation.
+ *
+ * @throws IOException if an error occurs during the store unload.
+ */
+ public abstract void unload() throws IOException;
+
+ @Override
+ protected void doStart() throws Exception {
+ this.indexLock.writeLock().lock();
+ if (getDirectory() == null) {
+ setDirectory(getDefaultDataDirectory());
+ }
+ IOHelper.mkdirs(getDirectory());
+ try {
+ if (isPurgeStoreOnStartup()) {
+ getJournal().start();
+ getJournal().delete();
+ getJournal().close();
+ journal = null;
+ getPageFile().delete();
+ LOG.info("{} Persistence store purged.", this);
+ setPurgeStoreOnStartup(false);
+ }
+
+ load();
+ store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ unload();
+ }
+
+ public PageFile getPageFile() {
+ if (pageFile == null) {
+ pageFile = createPageFile();
+ }
+ return pageFile;
+ }
+
+ public Journal getJournal() throws IOException {
+ if (journal == null) {
+ journal = createJournal();
+ }
+ return journal;
+ }
+
+ public File getDirectory() {
+ return directory;
+ }
+
+ public void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public boolean isArchiveCorruptedIndex() {
+ return archiveCorruptedIndex;
+ }
+
+ public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+ this.archiveCorruptedIndex = archiveCorruptedIndex;
+ }
+
+ public boolean isFailIfDatabaseIsLocked() {
+ return failIfDatabaseIsLocked;
+ }
+
+ public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+ this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+ }
+
+ public boolean isCheckForCorruptJournalFiles() {
+ return checkForCorruptJournalFiles;
+ }
+
+ public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+ this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
+ }
+
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public void setCheckpointInterval(long checkpointInterval) {
+ this.checkpointInterval = checkpointInterval;
+ }
+
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
+ public void setCleanupInterval(long cleanupInterval) {
+ this.cleanupInterval = cleanupInterval;
+ }
+
+ public boolean isChecksumJournalFiles() {
+ return checksumJournalFiles;
+ }
+
+ public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+ this.checksumJournalFiles = checksumJournalFiles;
+ }
+
+ public boolean isForceRecoverIndex() {
+ return forceRecoverIndex;
+ }
+
+ public void setForceRecoverIndex(boolean forceRecoverIndex) {
+ this.forceRecoverIndex = forceRecoverIndex;
+ }
+
+ public int getJournalMaxFileLength() {
+ return journalMaxFileLength;
+ }
+
+ public void setJournalMaxFileLength(int journalMaxFileLength) {
+ this.journalMaxFileLength = journalMaxFileLength;
+ }
+
+ public int getJournalMaxWriteBatchSize() {
+ return journalMaxWriteBatchSize;
+ }
+
+ public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
+ this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
+ }
+
+ public boolean isEnableIndexWriteAsync() {
+ return enableIndexWriteAsync;
+ }
+
+ public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+ this.enableIndexWriteAsync = enableIndexWriteAsync;
+ }
+
+ public boolean isEnableJournalDiskSyncs() {
+ return enableJournalDiskSyncs;
+ }
+
+ public void setEnableJournalDiskSyncs(boolean syncWrites) {
+ this.enableJournalDiskSyncs = syncWrites;
+ }
+
+ public boolean isDeleteAllJobs() {
+ return deleteAllJobs;
+ }
+
+ public void setDeleteAllJobs(boolean deleteAllJobs) {
+ this.deleteAllJobs = deleteAllJobs;
+ }
+
+ /**
+ * @return the archiveDataLogs
+ */
+ public boolean isArchiveDataLogs() {
+ return this.archiveDataLogs;
+ }
+
+ /**
+ * @param archiveDataLogs the archiveDataLogs to set
+ */
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ this.archiveDataLogs = archiveDataLogs;
+ }
+
+ /**
+ * @return the directoryArchive
+ */
+ public File getDirectoryArchive() {
+ return this.directoryArchive;
+ }
+
+ /**
+ * @param directoryArchive the directoryArchive to set
+ */
+ public void setDirectoryArchive(File directoryArchive) {
+ this.directoryArchive = directoryArchive;
+ }
+
+ public int getIndexCacheSize() {
+ return indexCacheSize;
+ }
+
+ public void setIndexCacheSize(int indexCacheSize) {
+ this.indexCacheSize = indexCacheSize;
+ }
+
+ public int getIndexWriteBatchSize() {
+ return indexWriteBatchSize;
+ }
+
+ public void setIndexWriteBatchSize(int indexWriteBatchSize) {
+ this.indexWriteBatchSize = indexWriteBatchSize;
+ }
+
+ public boolean isUseIndexLFRUEviction() {
+ return useIndexLFRUEviction;
+ }
+
+ public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
+ this.useIndexLFRUEviction = useIndexLFRUEviction;
+ }
+
+ public float getIndexLFUEvictionFactor() {
+ return indexLFUEvictionFactor;
+ }
+
+ public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
+ this.indexLFUEvictionFactor = indexLFUEvictionFactor;
+ }
+
+ public boolean isEnableIndexDiskSyncs() {
+ return enableIndexDiskSyncs;
+ }
+
+ public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
+ this.enableIndexDiskSyncs = enableIndexDiskSyncs;
+ }
+
+ public boolean isEnableIndexRecoveryFile() {
+ return enableIndexRecoveryFile;
+ }
+
+ public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
+ this.enableIndexRecoveryFile = enableIndexRecoveryFile;
+ }
+
+ public boolean isEnableIndexPageCaching() {
+ return enableIndexPageCaching;
+ }
+
+ public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
+ this.enableIndexPageCaching = enableIndexPageCaching;
+ }
+
+ public boolean isPurgeStoreOnStartup() {
+ return this.purgeStoreOnStartup;
+ }
+
+ public void setPurgeStoreOnStartup(boolean purge) {
+ this.purgeStoreOnStartup = purge;
+ }
+
+ public boolean isIgnoreMissingJournalfiles() {
+ return ignoreMissingJournalfiles;
+ }
+
+ public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
+ this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
+ }
+
+ public long size() {
+ if (!isStarted()) {
+ return 0;
+ }
+ try {
+ return journalSize.get() + pageFile.getDiskSize();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Locker createDefaultLocker() throws IOException {
+ SharedFileLocker locker = new SharedFileLocker();
+ locker.setDirectory(this.getDirectory());
+ return locker;
+ }
+
+ @Override
+ public void init() throws Exception {
+ }
+
+ /**
+ * Store a command in the Journal and process to update the Store index.
+ *
+ * @param command
+ * The specific JournalCommand to store and process.
+ *
+ * @returns the Location where the data was written in the Journal.
+ *
+ * @throws IOException if an error occurs storing or processing the command.
+ */
+ public Location store(JournalCommand<?> command) throws IOException {
+ return store(command, isEnableIndexDiskSyncs(), null, null, null);
+ }
+
+ /**
+ * Store a command in the Journal and process to update the Store index.
+ *
+ * @param command
+ * The specific JournalCommand to store and process.
+ * @param sync
+ * Should the store operation be done synchronously. (ignored if completion passed).
+ *
+ * @returns the Location where the data was written in the Journal.
+ *
+ * @throws IOException if an error occurs storing or processing the command.
+ */
+ public Location store(JournalCommand<?> command, boolean sync) throws IOException {
+ return store(command, sync, null, null, null);
+ }
+
+ /**
+ * Store a command in the Journal and process to update the Store index.
+ *
+ * @param command
+ * The specific JournalCommand to store and process.
+ * @param onJournalStoreComplete
+ * The Runnable to call when the Journal write operation completes.
+ *
+ * @returns the Location where the data was written in the Journal.
+ *
+ * @throws IOException if an error occurs storing or processing the command.
+ */
+ public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
+ return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
+ }
+
+ /**
+ * Store a command in the Journal and process to update the Store index.
+ *
+ * @param command
+ * The specific JournalCommand to store and process.
+ * @param sync
+ * Should the store operation be done synchronously. (ignored if completion passed).
+ * @param before
+ * The Runnable instance to execute before performing the store and process operation.
+ * @param after
+ * The Runnable instance to execute after performing the store and process operation.
+ *
+ * @returns the Location where the data was written in the Journal.
+ *
+ * @throws IOException if an error occurs storing or processing the command.
+ */
+ public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
+ return store(command, sync, before, after, null);
+ }
+
+ /**
+ * All updated are are funneled through this method. The updates are converted to a
+ * JournalMessage which is logged to the journal and then the data from the JournalMessage
+ * is used to update the index just like it would be done during a recovery process.
+ *
+ * @param command
+ * The specific JournalCommand to store and process.
+ * @param sync
+ * Should the store operation be done synchronously. (ignored if completion passed).
+ * @param before
+ * The Runnable instance to execute before performing the store and process operation.
+ * @param after
+ * The Runnable instance to execute after performing the store and process operation.
+ * @param onJournalStoreComplete
+ * Callback to be run when the journal write operation is complete.
+ *
+ * @returns the Location where the data was written in the Journal.
+ *
+ * @throws IOException if an error occurs storing or processing the command.
+ */
+ public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
+ try {
+
+ if (before != null) {
+ before.run();
+ }
+
+ ByteSequence sequence = toByteSequence(command);
+ Location location;
+ checkpointLock.readLock().lock();
+ try {
+
+ long start = System.currentTimeMillis();
+ location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
+ journal.write(sequence, onJournalStoreComplete);
+ long start2 = System.currentTimeMillis();
+
+ process(command, location);
+
+ long end = System.currentTimeMillis();
+ if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
+ (start2-start), (end-start2));
+ }
+ } finally {
+ checkpointLock.readLock().unlock();
+ }
+
+ if (after != null) {
+ after.run();
+ }
+
+ if (checkpointThread != null && !checkpointThread.isAlive()) {
+ startCheckpoint();
+ }
+ return location;
+ } catch (IOException ioe) {
+ LOG.error("KahaDB failed to store to Journal", ioe);
+ if (brokerService != null) {
+ brokerService.handleIOException(ioe);
+ }
+ throw ioe;
+ }
+ }
+
+ /**
+ * Loads a previously stored JournalMessage
+ *
+ * @param location
+ * The location of the journal command to read.
+ *
+ * @return a new un-marshaled JournalCommand instance.
+ *
+ * @throws IOException if an error occurs reading the stored command.
+ */
+ protected JournalCommand<?> load(Location location) throws IOException {
+ ByteSequence data = journal.read(location);
+ DataByteArrayInputStream is = new DataByteArrayInputStream(data);
+ byte readByte = is.readByte();
+ KahaEntryType type = KahaEntryType.valueOf(readByte);
+ if (type == null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ throw new IOException("Could not load journal record. Invalid location: " + location);
+ }
+ JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
+ message.mergeFramed(is);
+ return message;
+ }
+
+ /**
+ * Process a stored or recovered JournalCommand instance and update the DB Index with the
+ * state changes that this command produces. This can be called either as a new DB operation
+ * or as a replay during recovery operations.
+ *
+ * @param command
+ * The JournalCommand to process.
+ * @param location
+ * The location in the Journal where the command was written or read from.
+ */
+ protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
+
+ /**
+ * Perform a checkpoint operation with optional cleanup.
+ *
+ * Called by the checkpoint background thread periodically to initiate a checkpoint operation
+ * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
+ * longer needed journal log files etc.
+ *
+ * @param cleanup
+ * Should the method do a simple checkpoint or also perform a journal cleanup.
+ *
+ * @throws IOException if an error occurs during the checkpoint operation.
+ */
+ protected void checkpointUpdate(final boolean cleanup) throws IOException {
+ checkpointLock.writeLock().lock();
+ try {
+ this.indexLock.writeLock().lock();
+ try {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ @Override
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx, cleanup);
+ }
+ });
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+
+ } finally {
+ checkpointLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Perform the checkpoint update operation. If the cleanup flag is true then the
+ * operation should also purge any unused Journal log files.
+ *
+ * This method must always be called with the checkpoint and index write locks held.
+ *
+ * @param tx
+ * The TX under which to perform the checkpoint update.
+ * @param cleanup
+ * Should the checkpoint also do unused Journal file cleanup.
+ *
+ * @throws IOException if an error occurs while performing the checkpoint.
+ */
+ protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
+
+ /**
+ * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
+ *
+ * @param command
+ * The Journal Command that should be marshaled to bytes for writing.
+ *
+ * @return the byte representation of the given journal command.
+ *
+ * @throws IOException if an error occurs while serializing the command.
+ */
+ protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
+ int size = data.serializedSizeFramed();
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
+ os.writeByte(data.type().getNumber());
+ data.writeFramed(os);
+ return os.toByteSequence();
+ }
+
+ /**
+ * Create the PageFile instance and configure it using the configuration options
+ * currently set.
+ *
+ * @return the newly created and configured PageFile instance.
+ */
+ protected PageFile createPageFile() {
+ PageFile index = new PageFile(getDirectory(), getPageFileName());
+ index.setEnableWriteThread(isEnableIndexWriteAsync());
+ index.setWriteBatchSize(getIndexWriteBatchSize());
+ index.setPageCacheSize(getIndexCacheSize());
+ index.setUseLFRUEviction(isUseIndexLFRUEviction());
+ index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
+ index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
+ index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
+ index.setEnablePageCaching(isEnableIndexPageCaching());
+ return index;
+ }
+
+ /**
+ * Create a new Journal instance and configure it using the currently set configuration
+ * options. If an archive directory is configured than this method will attempt to create
+ * that directory if it does not already exist.
+ *
+ * @return the newly created an configured Journal instance.
+ *
+ * @throws IOException if an error occurs while creating the Journal object.
+ */
+ protected Journal createJournal() throws IOException {
+ Journal manager = new Journal();
+ manager.setDirectory(getDirectory());
+ manager.setMaxFileLength(getJournalMaxFileLength());
+ manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
+ manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
+ manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
+ manager.setArchiveDataLogs(isArchiveDataLogs());
+ manager.setSizeAccumulator(journalSize);
+ manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
+ if (getDirectoryArchive() != null) {
+ IOHelper.mkdirs(getDirectoryArchive());
+ manager.setDirectoryArchive(getDirectoryArchive());
+ }
+ return manager;
+ }
+
+ /**
+ * Starts the checkpoint Thread instance if not already running and not disabled
+ * by configuration.
+ */
+ protected void startCheckpoint() {
+ if (checkpointInterval == 0 && cleanupInterval == 0) {
+ LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
+ return;
+ }
+ synchronized (checkpointThreadLock) {
+ boolean start = false;
+ if (checkpointThread == null) {
+ start = true;
+ } else if (!checkpointThread.isAlive()) {
+ start = true;
+ LOG.info("KahaDB: Recovering checkpoint thread after death");
+ }
+ if (start) {
+ checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ @Override
+ public void run() {
+ try {
+ long lastCleanup = System.currentTimeMillis();
+ long lastCheckpoint = System.currentTimeMillis();
+ // Sleep for a short time so we can periodically check
+ // to see if we need to exit this thread.
+ long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
+ while (opened.get()) {
+ Thread.sleep(sleepTime);
+ long now = System.currentTimeMillis();
+ if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
+ checkpointCleanup(true);
+ lastCleanup = now;
+ lastCheckpoint = now;
+ } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
+ checkpointCleanup(false);
+ lastCheckpoint = now;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Looks like someone really wants us to exit this thread...
+ } catch (IOException ioe) {
+ LOG.error("Checkpoint failed", ioe);
+ brokerService.handleIOException(ioe);
+ }
+ }
+ };
+
+ checkpointThread.setDaemon(true);
+ checkpointThread.start();
+ }
+ }
+ }
+
+ /**
+ * Called from the worker thread to start a checkpoint.
+ *
+ * This method ensure that the store is in an opened state and optionaly logs information
+ * related to slow store access times.
+ *
+ * @param cleanup
+ * Should a cleanup of the journal occur during the checkpoint operation.
+ *
+ * @throws IOException if an error occurs during the checkpoint operation.
+ */
+ protected void checkpointCleanup(final boolean cleanup) throws IOException {
+ long start;
+ this.indexLock.writeLock().lock();
+ try {
+ start = System.currentTimeMillis();
+ if (!opened.get()) {
+ return;
+ }
+ } finally {
+ this.indexLock.writeLock().unlock();
+ }
+ checkpointUpdate(cleanup);
+ long end = System.currentTimeMillis();
+ if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
+ LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
new file mode 100644
index 0000000..defb238
--- /dev/null
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Page;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+
+/**
+ * Interface for the store meta data used to hold the index value and other needed
+ * information to manage a KahaDB store instance.
+ */
+public interface KahaDBMetaData<T> {
+
+ /**
+ * Indicates that this meta data instance has been opened and is active.
+ */
+ public static final int OPEN_STATE = 2;
+
+ /**
+ * Indicates that this meta data instance has been closed and is no longer active.
+ */
+ public static final int CLOSED_STATE = 1;
+
+ /**
+ * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored.
+ *
+ * @return the Page to use to start access the KahaDBMetaData instance.
+ */
+ Page<T> getPage();
+
+ /**
+ * Sets the Page instance used to load and store the KahaDBMetaData instance.
+ *
+ * @param page
+ * the new Page value to use.
+ */
+ void setPage(Page<T> page);
+
+ /**
+ * Gets the state flag of this meta data instance.
+ *
+ * @return the current state value for this instance.
+ */
+ int getState();
+
+ /**
+ * Sets the current value of the state flag.
+ *
+ * @param value
+ * the new value to assign to the state flag.
+ */
+ void setState(int value);
+
+ /**
+ * Returns the Journal Location value that indicates that last recorded update
+ * that was successfully performed for this KahaDB store implementation.
+ *
+ * @return the location of the last successful update location.
+ */
+ Location getLastUpdateLocation();
+
+ /**
+ * Updates the value of the last successful update.
+ *
+ * @param location
+ * the new value to assign the last update location field.
+ */
+ void setLastUpdateLocation(Location location);
+
+ /**
+ * For a newly created KahaDBMetaData instance this method is called to allow
+ * the instance to create all of it's internal indices and other state data.
+ *
+ * @param tx
+ * the Transaction instance under which the operation is executed.
+ *
+ * @throws IOException if an error occurs while creating the meta data structures.
+ */
+ void initialize(Transaction tx) throws IOException;
+
+ /**
+ * Instructs this object to load its internal data structures from the KahaDB PageFile
+ * and prepare itself for use.
+ *
+ * @param tx
+ * the Transaction instance under which the operation is executed.
+ *
+ * @throws IOException if an error occurs while creating the meta data structures.
+ */
+ void load(Transaction tx) throws IOException;
+
+ /**
+ * Reads the serialized for of this object from the KadaDB PageFile and prepares it
+ * for use. This method does not need to perform a full load of the meta data structures
+ * only read in the information necessary to load them from the PageFile on a call to the
+ * load method.
+ *
+ * @param in
+ * the DataInput instance used to read this objects serialized form.
+ *
+ * @throws IOException if an error occurs while reading the serialized form.
+ */
+ void read(DataInput in) throws IOException;
+
+ /**
+ * Writes the object into a serialized form which can be read back in again using the
+ * read method.
+ *
+ * @param out
+ * the DataOutput instance to use to write the current state to a serialized form.
+ *
+ * @throws IOException if an error occurs while serializing this instance.
+ */
+ void write(DataOutput out) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index d8b986e..8ca8ca4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -31,6 +31,7 @@ import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterView;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -39,7 +40,14 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.JournaledStore;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionIdTransformerAware;
+import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -642,4 +650,9 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
getStore().setTransactionIdTransformer(transactionIdTransformer);
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ return this.letter.createJobSchedulerStore();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 60c0738..975cd05 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -55,7 +56,14 @@ import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -66,6 +74,7 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
@@ -172,6 +181,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
+
/**
* @param maxAsyncJobs
* the maxAsyncJobs to set
@@ -426,6 +436,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
+ @Override
public void updateMessage(Message message) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
@@ -472,7 +483,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
indexLock.writeLock().lock();
try {
location = findMessageLocation(key, dest);
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
if (location == null) {
@@ -492,19 +503,17 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
- // of
- // messages in the destination.
+ // of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
- for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
- .hasNext();) {
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
} finally {
@@ -525,7 +534,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.locationIndex.isEmpty(tx);
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -552,12 +561,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
-
@Override
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.writeLock().lock();
@@ -583,7 +591,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.stoppedIterating();
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -628,7 +636,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
});
} catch (Exception e) {
LOG.error("Failed to reset batching",e);
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -641,8 +649,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
- // operations... but for now we must
- // externally synchronize...
+ // operations... but for now we must externally synchronize...
indexLock.writeLock().lock();
try {
@@ -725,8 +732,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack)
- throws IOException {
+ MessageId messageId, MessageAck ack) throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
@@ -810,7 +816,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
@@ -836,7 +842,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
.getSubscriptionInfo().newInput()));
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -859,7 +865,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return (int) getStoredMessageCount(tx, sd, subscriptionKey);
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -890,7 +896,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.resetCursorPosition();
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -943,7 +949,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- }finally {
+ } finally {
indexLock.writeLock().unlock();
}
}
@@ -1358,7 +1364,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
LOG.warn("Failed to aquire lock", e);
}
}
-
}
@Override
@@ -1422,7 +1427,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
if (runnable instanceof StoreTask) {
((StoreTask)runnable).releaseLocks();
}
-
}
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ return new JobSchedulerStoreImpl();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index d10c4eb..eca83e8 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -16,12 +16,44 @@
*/
package org.apache.activemq.store.kahadb;
-import org.apache.activemq.broker.*;
-import org.apache.activemq.command.*;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.Lockable;
+import org.apache.activemq.broker.LockableServiceSupport;
+import org.apache.activemq.broker.Locker;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.SharedFileLocker;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.TransactionIdTransformerAware;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@@ -30,13 +62,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.*;
-
/**
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
* distribution of destinations across multiple kahaDB persistence adapters
@@ -50,6 +75,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
final class DelegateDestinationMap extends DestinationMap {
+ @Override
public void setEntries(List<DestinationMapEntry> entries) {
super.setEntries(entries);
}
@@ -252,7 +278,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
if (adapter instanceof PersistenceAdapter) {
adapter.removeQueueMessageStore(destination);
- removeMessageStore((PersistenceAdapter)adapter, destination);
+ removeMessageStore(adapter, destination);
destinationMap.removeAll(destination);
}
}
@@ -267,7 +293,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
if (adapter instanceof PersistenceAdapter) {
adapter.removeTopicMessageStore(destination);
- removeMessageStore((PersistenceAdapter)adapter, destination);
+ removeMessageStore(adapter, destination);
destinationMap.removeAll(destination);
}
}
@@ -453,6 +479,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
}
+ @Override
public BrokerService getBrokerService() {
return brokerService;
}
@@ -503,4 +530,9 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
locker.configure(this);
return locker;
}
+
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ return new JobSchedulerStoreImpl();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index c7ece83..8840a1d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -31,16 +31,24 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.*;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.ListenableFuture;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionRecoveryListener;
+import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.util.IOHelper;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,6 +194,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return inflightTransactions.remove(txid);
}
+ @Override
public void prepare(TransactionId txid) throws IOException {
Tx tx = getTx(txid);
for (TransactionStore store : tx.getStores()) {
@@ -193,6 +202,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
+ @Override
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
@@ -247,6 +257,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return location;
}
+ @Override
public void rollback(TransactionId txid) throws IOException {
Tx tx = removeTx(txid);
if (tx != null) {
@@ -256,6 +267,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
+ @Override
public void start() throws Exception {
journal = new Journal() {
@Override
@@ -289,6 +301,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
}
+ @Override
public void stop() throws Exception {
journal.close();
journal = null;
@@ -334,6 +347,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
+ @Override
public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 66ae496..45e35c6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -51,31 +52,35 @@ import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
private final WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
+ @Override
public void setBrokerName(String brokerName) {
}
+ @Override
public void setUsageManager(SystemUsage usageManager) {
}
+ @Override
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
-
+
+ @Override
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@@ -85,18 +90,21 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
postCommit.run();
}
}
+ @Override
public void prepare(TransactionId txid) throws IOException {
- processPrepare(txid);
+ processPrepare(txid);
}
+ @Override
public void rollback(TransactionId txid) throws IOException {
- processRollback(txid);
+ processRollback(txid);
}
+ @Override
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId)entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
+
for (Operation op : entry.getValue()) {
if( op.getClass() == AddOpperation.class ) {
AddOpperation addOp = (AddOpperation)op;
@@ -108,7 +116,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
ackList.add(ack);
}
}
-
+
Message[] addedMessages = new Message[messageList.size()];
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
@@ -116,8 +124,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
listener.recover(xid, addedMessages, acks);
}
}
+ @Override
public void start() throws Exception {
}
+ @Override
public void stop() throws Exception {
}
};
@@ -136,13 +146,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return destination;
}
+ @Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
}
-
+
+ @Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
@@ -150,20 +162,23 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
processRemove(command, ack.getTransactionId());
}
+ @Override
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
process(command);
}
+ @Override
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
-
+
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
ByteSequence data;
synchronized(indexMutex) {
data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
+ @Override
public ByteSequence execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
@@ -177,14 +192,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
if( data == null ) {
return null;
}
-
+
Message msg = (Message)wireFormat.unmarshal( data );
- return msg;
+ return msg;
}
-
+
+ @Override
public int getMessageCount() throws IOException {
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+ @Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
@@ -199,9 +216,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
+ @Override
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
@@ -214,10 +233,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
long cursorPos=0;
-
+
+ @Override
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageRecord> entry=null;
@@ -238,20 +259,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
+ @Override
public void resetBatching() {
cursorPos=0;
}
-
+
@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
-
+
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
Long location;
synchronized(indexMutex) {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
+ @Override
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
@@ -261,7 +284,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
if( location!=null ) {
cursorPos=location+1;
}
-
+
}
@Override
@@ -273,14 +296,15 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
@Override
public void stop() throws Exception {
}
-
+
}
-
+
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
public KahaDBTopicMessageStore(ActiveMQTopic destination) {
super(destination);
}
-
+
+ @Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@@ -294,6 +318,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
processRemove(command, null);
}
+ @Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
@@ -305,6 +330,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
process(command);
}
+ @Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
@@ -312,11 +338,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
process(command);
}
+ @Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-
+
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ @Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
@@ -328,16 +356,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
});
}
-
+
SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
subscriptions.toArray(rc);
return rc;
}
+ @Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
+ @Override
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
@@ -349,11 +379,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
});
}
}
-
+
+ @Override
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
+ @Override
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
@@ -362,7 +394,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return 0;
}
cursorPos += 1;
-
+
int counter = 0;
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
iterator.next();
@@ -371,18 +403,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return counter;
}
});
- }
+ }
}
+ @Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
-
+
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
Entry<Long, MessageRecord> entry = iterator.next();
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
@@ -392,10 +426,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
+ @Override
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
+ @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
@@ -403,7 +439,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
}
-
+
Entry<Long, MessageRecord> entry=null;
int counter = 0;
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
@@ -422,11 +458,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
+ @Override
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ @Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
@@ -442,11 +480,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
String subscriptionKey(String clientId, String subscriptionName){
return clientId+":"+subscriptionName;
}
-
+
+ @Override
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return new KahaDBMessageStore(destination);
}
+ @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return new KahaDBTopicMessageStore(destination);
}
@@ -457,6 +497,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
*
* @param destination Destination to forget
*/
+ @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
@@ -466,18 +507,22 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
*
* @param destination Destination to forget
*/
+ @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
+ @Override
public void deleteAllMessages() throws IOException {
}
-
-
+
+
+ @Override
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
+ @Override
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
@@ -491,11 +536,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
throw new RuntimeException(e);
}
}
-
+
+ @Override
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
+
+ @Override
public long size() {
if ( !started.get() ) {
return 0;
@@ -507,32 +554,36 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
+ @Override
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
+ @Override
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
+ @Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
-
+
+ @Override
public void checkpoint(boolean sync) throws IOException {
- }
+ }
///////////////////////////////////////////////////////////////////
// Internal conversion methods.
///////////////////////////////////////////////////////////////////
-
-
+
+
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
-
+
KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
@@ -561,7 +612,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
int type = Integer.parseInt(dest.substring(0, p));
String name = dest.substring(p+1);
-
+
switch( KahaDestination.DestinationType.valueOf(type) ) {
case QUEUE:
return new ActiveMQQueue(name);
@@ -571,11 +622,12 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return new ActiveMQTempQueue(name);
case TEMP_TOPIC:
return new ActiveMQTempTopic(name);
- default:
+ default:
throw new IllegalArgumentException("Not in the valid destination format");
}
}
-
+
+ @Override
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
@@ -592,4 +644,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
super.load();
}
+ @Override
+ public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index be4f2ff..43fc152 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -20,11 +20,16 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
+import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
+import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
+import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -62,6 +67,21 @@ public class Visitor {
public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
}
+ public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException {
+ }
+
+ public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException {
+ }
+
+ public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException {
+ }
+
+ public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException {
+ }
+
+ public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException {
+ }
+
public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
index 86b9fa3..217bc1f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
@@ -25,8 +25,8 @@ public class JobImpl implements Job {
private final JobLocation jobLocation;
private final byte[] payload;
- protected JobImpl(JobLocation location,ByteSequence bs) {
- this.jobLocation=location;
+ protected JobImpl(JobLocation location, ByteSequence bs) {
+ this.jobLocation = location;
this.payload = new byte[bs.getLength()];
System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
}
@@ -38,22 +38,22 @@ public class JobImpl implements Job {
@Override
public byte[] getPayload() {
- return this.payload;
+ return this.payload;
}
@Override
public long getPeriod() {
- return this.jobLocation.getPeriod();
+ return this.jobLocation.getPeriod();
}
@Override
public int getRepeat() {
- return this.jobLocation.getRepeat();
+ return this.jobLocation.getRepeat();
}
@Override
public long getStart() {
- return this.jobLocation.getStartTime();
+ return this.jobLocation.getStartTime();
}
@Override
@@ -76,4 +76,13 @@ public class JobImpl implements Job {
return JobSupport.getDateTime(getStart());
}
+ @Override
+ public int getExecutionCount() {
+ return this.jobLocation.getRescheduledCount();
+ }
+
+ @Override
+ public String toString() {
+ return "Job: " + getJobId();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/fc244f48/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
index 13cf376..cb66145 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
@@ -36,6 +36,8 @@ class JobLocation {
private long period;
private String cronEntry;
private final Location location;
+ private int rescheduledCount;
+ private Location lastUpdate;
public JobLocation(Location location) {
this.location = location;
@@ -52,8 +54,12 @@ class JobLocation {
this.delay = in.readLong();
this.nextTime = in.readLong();
this.period = in.readLong();
- this.cronEntry=in.readUTF();
+ this.cronEntry = in.readUTF();
this.location.readExternal(in);
+ if (in.readBoolean()) {
+ this.lastUpdate = new Location();
+ this.lastUpdate.readExternal(in);
+ }
}
public void writeExternal(DataOutput out) throws IOException {
@@ -63,11 +69,17 @@ class JobLocation {
out.writeLong(this.delay);
out.writeLong(this.nextTime);
out.writeLong(this.period);
- if (this.cronEntry==null) {
- this.cronEntry="";
+ if (this.cronEntry == null) {
+ this.cronEntry = "";
}
out.writeUTF(this.cronEntry);
this.location.writeExternal(out);
+ if (lastUpdate != null) {
+ out.writeBoolean(true);
+ this.lastUpdate.writeExternal(out);
+ } else {
+ out.writeBoolean(false);
+ }
}
/**
@@ -123,7 +135,8 @@ class JobLocation {
}
/**
- * @param nextTime the nextTime to set
+ * @param nextTime
+ * the nextTime to set
*/
public synchronized void setNextTime(long nextTime) {
this.nextTime = nextTime;
@@ -152,7 +165,8 @@ class JobLocation {
}
/**
- * @param cronEntry the cronEntry to set
+ * @param cronEntry
+ * the cronEntry to set
*/
public synchronized void setCronEntry(String cronEntry) {
this.cronEntry = cronEntry;
@@ -173,7 +187,8 @@ class JobLocation {
}
/**
- * @param delay the delay to set
+ * @param delay
+ * the delay to set
*/
public void setDelay(long delay) {
this.delay = delay;
@@ -186,15 +201,55 @@ class JobLocation {
return this.location;
}
+ /**
+ * @returns the location in the journal of the last update issued for this
+ * Job.
+ */
+ public Location getLastUpdate() {
+ return this.lastUpdate;
+ }
+
+ /**
+ * Sets the location of the last update command written to the journal for
+ * this Job. The update commands set the next execution time for this job.
+ * We need to keep track of only the latest update as it's the only one we
+ * really need to recover the correct state from the journal.
+ *
+ * @param location
+ * The location in the journal of the last update command.
+ */
+ public void setLastUpdate(Location location) {
+ this.lastUpdate = location;
+ }
+
+ /**
+ * @return the number of time this job has been rescheduled.
+ */
+ public int getRescheduledCount() {
+ return rescheduledCount;
+ }
+
+ /**
+ * Sets the number of time this job has been rescheduled. A newly added job will return
+ * zero and increment this value each time a scheduled message is dispatched to its
+ * target destination and the job is rescheduled for another cycle.
+ *
+ * @param executionCount
+ * the new execution count to assign the JobLocation.
+ */
+ public void setRescheduledCount(int rescheduledCount) {
+ this.rescheduledCount = rescheduledCount;
+ }
+
@Override
public String toString() {
- return "Job [id=" + jobId + ", startTime=" + new Date(startTime)
- + ", delay=" + delay + ", period=" + period + ", repeat="
- + repeat + ", nextTime=" + new Date(nextTime) + "]";
+ return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime="
+ + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]";
}
static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
+
@Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
@@ -228,6 +283,7 @@ class JobLocation {
result = prime * result + (int) (period ^ (period >>> 32));
result = prime * result + repeat;
result = prime * result + (int) (startTime ^ (startTime >>> 32));
+ result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32));
return result;
}
@@ -286,6 +342,9 @@ class JobLocation {
if (startTime != other.startTime) {
return false;
}
+ if (rescheduledCount != other.rescheduledCount) {
+ return false;
+ }
return true;
}