You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/25 05:02:26 UTC
[4/5] activemq git commit: Revert
"https://issues.apache.org/jira/browse/AMQ-3758"
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
deleted file mode 100644
index 6003c87..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/AbstractKahaDBStore.java
+++ /dev/null
@@ -1,745 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.store.kahadb;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.activemq.broker.LockableServiceSupport;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.disk.journal.Journal;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.PageFile;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.ServiceStopper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractKahaDBStore extends LockableServiceSupport {
-
- static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
-
- public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
- public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
-
- protected File directory;
- protected PageFile pageFile;
- protected Journal journal;
- protected AtomicLong journalSize = new AtomicLong(0);
- protected boolean failIfDatabaseIsLocked;
- protected long checkpointInterval = 5*1000;
- protected long cleanupInterval = 30*1000;
- protected boolean checkForCorruptJournalFiles = false;
- protected boolean checksumJournalFiles = true;
- protected boolean forceRecoverIndex = false;
- protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
- protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
- protected boolean archiveCorruptedIndex = false;
- protected boolean enableIndexWriteAsync = false;
- protected boolean enableJournalDiskSyncs = false;
- protected boolean deleteAllJobs = false;
- protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
- protected boolean useIndexLFRUEviction = false;
- protected float indexLFUEvictionFactor = 0.2f;
- protected boolean ignoreMissingJournalfiles = false;
- protected int indexCacheSize = 1000;
- protected boolean enableIndexDiskSyncs = true;
- protected boolean enableIndexRecoveryFile = true;
- protected boolean enableIndexPageCaching = true;
- protected boolean archiveDataLogs;
- protected boolean purgeStoreOnStartup;
- protected File directoryArchive;
-
- protected AtomicBoolean opened = new AtomicBoolean();
- protected Thread checkpointThread;
- protected final Object checkpointThreadLock = new Object();
- protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
- protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
-
- /**
- * @return the name to give this store's PageFile instance.
- */
- protected abstract String getPageFileName();
-
- /**
- * @return the location of the data directory if no set by configuration.
- */
- protected abstract File getDefaultDataDirectory();
-
- /**
- * Loads the store from disk.
- *
- * Based on configuration this method can either load an existing store or it can purge
- * an existing store and start in a clean state.
- *
- * @throws IOException if an error occurs during the load.
- */
- public abstract void load() throws IOException;
-
- /**
- * Unload the state of the Store to disk and shuts down all resources assigned to this
- * KahaDB store implementation.
- *
- * @throws IOException if an error occurs during the store unload.
- */
- public abstract void unload() throws IOException;
-
- @Override
- protected void doStart() throws Exception {
- this.indexLock.writeLock().lock();
- if (getDirectory() == null) {
- setDirectory(getDefaultDataDirectory());
- }
- IOHelper.mkdirs(getDirectory());
- try {
- if (isPurgeStoreOnStartup()) {
- getJournal().start();
- getJournal().delete();
- getJournal().close();
- journal = null;
- getPageFile().delete();
- LOG.info("{} Persistence store purged.", this);
- setPurgeStoreOnStartup(false);
- }
-
- load();
- store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
- } finally {
- this.indexLock.writeLock().unlock();
- }
- }
-
- @Override
- protected void doStop(ServiceStopper stopper) throws Exception {
- unload();
- }
-
- public PageFile getPageFile() {
- if (pageFile == null) {
- pageFile = createPageFile();
- }
- return pageFile;
- }
-
- public Journal getJournal() throws IOException {
- if (journal == null) {
- journal = createJournal();
- }
- return journal;
- }
-
- public File getDirectory() {
- return directory;
- }
-
- public void setDirectory(File directory) {
- this.directory = directory;
- }
-
- public boolean isArchiveCorruptedIndex() {
- return archiveCorruptedIndex;
- }
-
- public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
- this.archiveCorruptedIndex = archiveCorruptedIndex;
- }
-
- public boolean isFailIfDatabaseIsLocked() {
- return failIfDatabaseIsLocked;
- }
-
- public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
- this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
- }
-
- public boolean isCheckForCorruptJournalFiles() {
- return checkForCorruptJournalFiles;
- }
-
- public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
- this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
- }
-
- public long getCheckpointInterval() {
- return checkpointInterval;
- }
-
- public void setCheckpointInterval(long checkpointInterval) {
- this.checkpointInterval = checkpointInterval;
- }
-
- public long getCleanupInterval() {
- return cleanupInterval;
- }
-
- public void setCleanupInterval(long cleanupInterval) {
- this.cleanupInterval = cleanupInterval;
- }
-
- public boolean isChecksumJournalFiles() {
- return checksumJournalFiles;
- }
-
- public void setChecksumJournalFiles(boolean checksumJournalFiles) {
- this.checksumJournalFiles = checksumJournalFiles;
- }
-
- public boolean isForceRecoverIndex() {
- return forceRecoverIndex;
- }
-
- public void setForceRecoverIndex(boolean forceRecoverIndex) {
- this.forceRecoverIndex = forceRecoverIndex;
- }
-
- public int getJournalMaxFileLength() {
- return journalMaxFileLength;
- }
-
- public void setJournalMaxFileLength(int journalMaxFileLength) {
- this.journalMaxFileLength = journalMaxFileLength;
- }
-
- public int getJournalMaxWriteBatchSize() {
- return journalMaxWriteBatchSize;
- }
-
- public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
- this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
- }
-
- public boolean isEnableIndexWriteAsync() {
- return enableIndexWriteAsync;
- }
-
- public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
- this.enableIndexWriteAsync = enableIndexWriteAsync;
- }
-
- public boolean isEnableJournalDiskSyncs() {
- return enableJournalDiskSyncs;
- }
-
- public void setEnableJournalDiskSyncs(boolean syncWrites) {
- this.enableJournalDiskSyncs = syncWrites;
- }
-
- public boolean isDeleteAllJobs() {
- return deleteAllJobs;
- }
-
- public void setDeleteAllJobs(boolean deleteAllJobs) {
- this.deleteAllJobs = deleteAllJobs;
- }
-
- /**
- * @return the archiveDataLogs
- */
- public boolean isArchiveDataLogs() {
- return this.archiveDataLogs;
- }
-
- /**
- * @param archiveDataLogs the archiveDataLogs to set
- */
- public void setArchiveDataLogs(boolean archiveDataLogs) {
- this.archiveDataLogs = archiveDataLogs;
- }
-
- /**
- * @return the directoryArchive
- */
- public File getDirectoryArchive() {
- return this.directoryArchive;
- }
-
- /**
- * @param directoryArchive the directoryArchive to set
- */
- public void setDirectoryArchive(File directoryArchive) {
- this.directoryArchive = directoryArchive;
- }
-
- public int getIndexCacheSize() {
- return indexCacheSize;
- }
-
- public void setIndexCacheSize(int indexCacheSize) {
- this.indexCacheSize = indexCacheSize;
- }
-
- public int getIndexWriteBatchSize() {
- return indexWriteBatchSize;
- }
-
- public void setIndexWriteBatchSize(int indexWriteBatchSize) {
- this.indexWriteBatchSize = indexWriteBatchSize;
- }
-
- public boolean isUseIndexLFRUEviction() {
- return useIndexLFRUEviction;
- }
-
- public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
- this.useIndexLFRUEviction = useIndexLFRUEviction;
- }
-
- public float getIndexLFUEvictionFactor() {
- return indexLFUEvictionFactor;
- }
-
- public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
- this.indexLFUEvictionFactor = indexLFUEvictionFactor;
- }
-
- public boolean isEnableIndexDiskSyncs() {
- return enableIndexDiskSyncs;
- }
-
- public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
- this.enableIndexDiskSyncs = enableIndexDiskSyncs;
- }
-
- public boolean isEnableIndexRecoveryFile() {
- return enableIndexRecoveryFile;
- }
-
- public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
- this.enableIndexRecoveryFile = enableIndexRecoveryFile;
- }
-
- public boolean isEnableIndexPageCaching() {
- return enableIndexPageCaching;
- }
-
- public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
- this.enableIndexPageCaching = enableIndexPageCaching;
- }
-
- public boolean isPurgeStoreOnStartup() {
- return this.purgeStoreOnStartup;
- }
-
- public void setPurgeStoreOnStartup(boolean purge) {
- this.purgeStoreOnStartup = purge;
- }
-
- public boolean isIgnoreMissingJournalfiles() {
- return ignoreMissingJournalfiles;
- }
-
- public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
- this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
- }
-
- public long size() {
- if (!isStarted()) {
- return 0;
- }
- try {
- return journalSize.get() + pageFile.getDiskSize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Locker createDefaultLocker() throws IOException {
- SharedFileLocker locker = new SharedFileLocker();
- locker.setDirectory(this.getDirectory());
- return locker;
- }
-
- @Override
- public void init() throws Exception {
- }
-
- /**
- * Store a command in the Journal and process to update the Store index.
- *
- * @param command
- * The specific JournalCommand to store and process.
- *
- * @returns the Location where the data was written in the Journal.
- *
- * @throws IOException if an error occurs storing or processing the command.
- */
- public Location store(JournalCommand<?> command) throws IOException {
- return store(command, isEnableIndexDiskSyncs(), null, null, null);
- }
-
- /**
- * Store a command in the Journal and process to update the Store index.
- *
- * @param command
- * The specific JournalCommand to store and process.
- * @param sync
- * Should the store operation be done synchronously. (ignored if completion passed).
- *
- * @returns the Location where the data was written in the Journal.
- *
- * @throws IOException if an error occurs storing or processing the command.
- */
- public Location store(JournalCommand<?> command, boolean sync) throws IOException {
- return store(command, sync, null, null, null);
- }
-
- /**
- * Store a command in the Journal and process to update the Store index.
- *
- * @param command
- * The specific JournalCommand to store and process.
- * @param onJournalStoreComplete
- * The Runnable to call when the Journal write operation completes.
- *
- * @returns the Location where the data was written in the Journal.
- *
- * @throws IOException if an error occurs storing or processing the command.
- */
- public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
- return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
- }
-
- /**
- * Store a command in the Journal and process to update the Store index.
- *
- * @param command
- * The specific JournalCommand to store and process.
- * @param sync
- * Should the store operation be done synchronously. (ignored if completion passed).
- * @param before
- * The Runnable instance to execute before performing the store and process operation.
- * @param after
- * The Runnable instance to execute after performing the store and process operation.
- *
- * @returns the Location where the data was written in the Journal.
- *
- * @throws IOException if an error occurs storing or processing the command.
- */
- public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
- return store(command, sync, before, after, null);
- }
-
- /**
- * All updated are are funneled through this method. The updates are converted to a
- * JournalMessage which is logged to the journal and then the data from the JournalMessage
- * is used to update the index just like it would be done during a recovery process.
- *
- * @param command
- * The specific JournalCommand to store and process.
- * @param sync
- * Should the store operation be done synchronously. (ignored if completion passed).
- * @param before
- * The Runnable instance to execute before performing the store and process operation.
- * @param after
- * The Runnable instance to execute after performing the store and process operation.
- * @param onJournalStoreComplete
- * Callback to be run when the journal write operation is complete.
- *
- * @returns the Location where the data was written in the Journal.
- *
- * @throws IOException if an error occurs storing or processing the command.
- */
- public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
- try {
-
- if (before != null) {
- before.run();
- }
-
- ByteSequence sequence = toByteSequence(command);
- Location location;
- checkpointLock.readLock().lock();
- try {
-
- long start = System.currentTimeMillis();
- location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
- journal.write(sequence, onJournalStoreComplete);
- long start2 = System.currentTimeMillis();
-
- process(command, location);
-
- long end = System.currentTimeMillis();
- if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
- (start2-start), (end-start2));
- }
- } finally {
- checkpointLock.readLock().unlock();
- }
-
- if (after != null) {
- after.run();
- }
-
- if (checkpointThread != null && !checkpointThread.isAlive()) {
- startCheckpoint();
- }
- return location;
- } catch (IOException ioe) {
- LOG.error("KahaDB failed to store to Journal", ioe);
- if (brokerService != null) {
- brokerService.handleIOException(ioe);
- }
- throw ioe;
- }
- }
-
- /**
- * Loads a previously stored JournalMessage
- *
- * @param location
- * The location of the journal command to read.
- *
- * @return a new un-marshaled JournalCommand instance.
- *
- * @throws IOException if an error occurs reading the stored command.
- */
- protected JournalCommand<?> load(Location location) throws IOException {
- ByteSequence data = journal.read(location);
- DataByteArrayInputStream is = new DataByteArrayInputStream(data);
- byte readByte = is.readByte();
- KahaEntryType type = KahaEntryType.valueOf(readByte);
- if (type == null) {
- try {
- is.close();
- } catch (IOException e) {
- }
- throw new IOException("Could not load journal record. Invalid location: " + location);
- }
- JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
- message.mergeFramed(is);
- return message;
- }
-
- /**
- * Process a stored or recovered JournalCommand instance and update the DB Index with the
- * state changes that this command produces. This can be called either as a new DB operation
- * or as a replay during recovery operations.
- *
- * @param command
- * The JournalCommand to process.
- * @param location
- * The location in the Journal where the command was written or read from.
- */
- protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
-
- /**
- * Perform a checkpoint operation with optional cleanup.
- *
- * Called by the checkpoint background thread periodically to initiate a checkpoint operation
- * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
- * longer needed journal log files etc.
- *
- * @param cleanup
- * Should the method do a simple checkpoint or also perform a journal cleanup.
- *
- * @throws IOException if an error occurs during the checkpoint operation.
- */
- protected void checkpointUpdate(final boolean cleanup) throws IOException {
- checkpointLock.writeLock().lock();
- try {
- this.indexLock.writeLock().lock();
- try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- @Override
- public void execute(Transaction tx) throws IOException {
- checkpointUpdate(tx, cleanup);
- }
- });
- } finally {
- this.indexLock.writeLock().unlock();
- }
-
- } finally {
- checkpointLock.writeLock().unlock();
- }
- }
-
- /**
- * Perform the checkpoint update operation. If the cleanup flag is true then the
- * operation should also purge any unused Journal log files.
- *
- * This method must always be called with the checkpoint and index write locks held.
- *
- * @param tx
- * The TX under which to perform the checkpoint update.
- * @param cleanup
- * Should the checkpoint also do unused Journal file cleanup.
- *
- * @throws IOException if an error occurs while performing the checkpoint.
- */
- protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
-
- /**
- * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
- *
- * @param command
- * The Journal Command that should be marshaled to bytes for writing.
- *
- * @return the byte representation of the given journal command.
- *
- * @throws IOException if an error occurs while serializing the command.
- */
- protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
- int size = data.serializedSizeFramed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(data.type().getNumber());
- data.writeFramed(os);
- return os.toByteSequence();
- }
-
- /**
- * Create the PageFile instance and configure it using the configuration options
- * currently set.
- *
- * @return the newly created and configured PageFile instance.
- */
- protected PageFile createPageFile() {
- PageFile index = new PageFile(getDirectory(), getPageFileName());
- index.setEnableWriteThread(isEnableIndexWriteAsync());
- index.setWriteBatchSize(getIndexWriteBatchSize());
- index.setPageCacheSize(getIndexCacheSize());
- index.setUseLFRUEviction(isUseIndexLFRUEviction());
- index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
- index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
- index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
- index.setEnablePageCaching(isEnableIndexPageCaching());
- return index;
- }
-
- /**
- * Create a new Journal instance and configure it using the currently set configuration
- * options. If an archive directory is configured than this method will attempt to create
- * that directory if it does not already exist.
- *
- * @return the newly created an configured Journal instance.
- *
- * @throws IOException if an error occurs while creating the Journal object.
- */
- protected Journal createJournal() throws IOException {
- Journal manager = new Journal();
- manager.setDirectory(getDirectory());
- manager.setMaxFileLength(getJournalMaxFileLength());
- manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
- manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
- manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
- manager.setArchiveDataLogs(isArchiveDataLogs());
- manager.setSizeAccumulator(journalSize);
- manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
- if (getDirectoryArchive() != null) {
- IOHelper.mkdirs(getDirectoryArchive());
- manager.setDirectoryArchive(getDirectoryArchive());
- }
- return manager;
- }
-
- /**
- * Starts the checkpoint Thread instance if not already running and not disabled
- * by configuration.
- */
- protected void startCheckpoint() {
- if (checkpointInterval == 0 && cleanupInterval == 0) {
- LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
- return;
- }
- synchronized (checkpointThreadLock) {
- boolean start = false;
- if (checkpointThread == null) {
- start = true;
- } else if (!checkpointThread.isAlive()) {
- start = true;
- LOG.info("KahaDB: Recovering checkpoint thread after death");
- }
- if (start) {
- checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
- @Override
- public void run() {
- try {
- long lastCleanup = System.currentTimeMillis();
- long lastCheckpoint = System.currentTimeMillis();
- // Sleep for a short time so we can periodically check
- // to see if we need to exit this thread.
- long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
- while (opened.get()) {
- Thread.sleep(sleepTime);
- long now = System.currentTimeMillis();
- if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
- checkpointCleanup(true);
- lastCleanup = now;
- lastCheckpoint = now;
- } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
- checkpointCleanup(false);
- lastCheckpoint = now;
- }
- }
- } catch (InterruptedException e) {
- // Looks like someone really wants us to exit this thread...
- } catch (IOException ioe) {
- LOG.error("Checkpoint failed", ioe);
- brokerService.handleIOException(ioe);
- }
- }
- };
-
- checkpointThread.setDaemon(true);
- checkpointThread.start();
- }
- }
- }
-
- /**
- * Called from the worker thread to start a checkpoint.
- *
- * This method ensure that the store is in an opened state and optionaly logs information
- * related to slow store access times.
- *
- * @param cleanup
- * Should a cleanup of the journal occur during the checkpoint operation.
- *
- * @throws IOException if an error occurs during the checkpoint operation.
- */
- protected void checkpointCleanup(final boolean cleanup) throws IOException {
- long start;
- this.indexLock.writeLock().lock();
- try {
- start = System.currentTimeMillis();
- if (!opened.get()) {
- return;
- }
- } finally {
- this.indexLock.writeLock().unlock();
- }
- checkpointUpdate(cleanup);
- long end = System.currentTimeMillis();
- if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
- LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
deleted file mode 100644
index defb238..0000000
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBMetaData.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.store.kahadb;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Page;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
-
-/**
- * Interface for the store meta data used to hold the index value and other needed
- * information to manage a KahaDB store instance.
- */
-public interface KahaDBMetaData<T> {
-
- /**
- * Indicates that this meta data instance has been opened and is active.
- */
- public static final int OPEN_STATE = 2;
-
- /**
- * Indicates that this meta data instance has been closed and is no longer active.
- */
- public static final int CLOSED_STATE = 1;
-
- /**
- * Gets the Page in the store PageFile where the KahaDBMetaData instance is stored.
- *
- * @return the Page to use to start access the KahaDBMetaData instance.
- */
- Page<T> getPage();
-
- /**
- * Sets the Page instance used to load and store the KahaDBMetaData instance.
- *
- * @param page
- * the new Page value to use.
- */
- void setPage(Page<T> page);
-
- /**
- * Gets the state flag of this meta data instance.
- *
- * @return the current state value for this instance.
- */
- int getState();
-
- /**
- * Sets the current value of the state flag.
- *
- * @param value
- * the new value to assign to the state flag.
- */
- void setState(int value);
-
- /**
- * Returns the Journal Location value that indicates that last recorded update
- * that was successfully performed for this KahaDB store implementation.
- *
- * @return the location of the last successful update location.
- */
- Location getLastUpdateLocation();
-
- /**
- * Updates the value of the last successful update.
- *
- * @param location
- * the new value to assign the last update location field.
- */
- void setLastUpdateLocation(Location location);
-
- /**
- * For a newly created KahaDBMetaData instance this method is called to allow
- * the instance to create all of it's internal indices and other state data.
- *
- * @param tx
- * the Transaction instance under which the operation is executed.
- *
- * @throws IOException if an error occurs while creating the meta data structures.
- */
- void initialize(Transaction tx) throws IOException;
-
- /**
- * Instructs this object to load its internal data structures from the KahaDB PageFile
- * and prepare itself for use.
- *
- * @param tx
- * the Transaction instance under which the operation is executed.
- *
- * @throws IOException if an error occurs while creating the meta data structures.
- */
- void load(Transaction tx) throws IOException;
-
- /**
- * Reads the serialized for of this object from the KadaDB PageFile and prepares it
- * for use. This method does not need to perform a full load of the meta data structures
- * only read in the information necessary to load them from the PageFile on a call to the
- * load method.
- *
- * @param in
- * the DataInput instance used to read this objects serialized form.
- *
- * @throws IOException if an error occurs while reading the serialized form.
- */
- void read(DataInput in) throws IOException;
-
- /**
- * Writes the object into a serialized form which can be read back in again using the
- * read method.
- *
- * @param out
- * the DataOutput instance to use to write the current state to a serialized form.
- *
- * @throws IOException if an error occurs while serializing this instance.
- */
- void write(DataOutput out) throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index 9b83a0e..e199d68 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -31,7 +31,6 @@ import org.apache.activemq.broker.LockableServiceSupport;
import org.apache.activemq.broker.Locker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterView;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
@@ -40,14 +39,7 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.JournaledStore;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionIdTransformerAware;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
@@ -667,9 +659,4 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
getStore().setTransactionIdTransformer(transactionIdTransformer);
}
-
- @Override
- public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
- return this.letter.createJobSchedulerStore();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 975cd05..60c0738 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -56,14 +55,7 @@ import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.ListenableFuture;
-import org.apache.activemq.store.MessageRecoveryListener;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -74,7 +66,6 @@ import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ServiceStopper;
@@ -181,7 +172,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public int getMaxAsyncJobs() {
return this.maxAsyncJobs;
}
-
/**
* @param maxAsyncJobs
* the maxAsyncJobs to set
@@ -436,7 +426,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
- @Override
public void updateMessage(Message message) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter());
@@ -483,7 +472,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
indexLock.writeLock().lock();
try {
location = findMessageLocation(key, dest);
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
if (location == null) {
@@ -503,17 +492,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count
- // of messages in the destination.
+ // of
+ // messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
int rc = 0;
- for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
+ for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator
+ .hasNext();) {
iterator.next();
rc++;
}
return rc;
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
} finally {
@@ -534,7 +525,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return sd.locationIndex.isEmpty(tx);
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -561,11 +552,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
+
@Override
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
indexLock.writeLock().lock();
@@ -591,7 +583,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.stoppedIterating();
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -636,7 +628,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
});
} catch (Exception e) {
LOG.error("Failed to reset batching",e);
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -649,7 +641,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
lockAsyncJobQueue();
// Hopefully one day the page file supports concurrent read
- // operations... but for now we must externally synchronize...
+ // operations... but for now we must
+ // externally synchronize...
indexLock.writeLock().lock();
try {
@@ -732,7 +725,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
@Override
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
- MessageId messageId, MessageAck ack) throws IOException {
+ MessageId messageId, MessageAck ack)
+ throws IOException {
String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString();
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
@@ -816,7 +810,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
@@ -842,7 +836,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
.getSubscriptionInfo().newInput()));
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -865,7 +859,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
return (int) getStoredMessageCount(tx, sd, subscriptionKey);
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -896,7 +890,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
sd.orderIndex.resetCursorPosition();
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -949,7 +943,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
}
}
});
- } finally {
+ }finally {
indexLock.writeLock().unlock();
}
}
@@ -1364,6 +1358,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
LOG.warn("Failed to aquire lock", e);
}
}
+
}
@Override
@@ -1427,11 +1422,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
if (runnable instanceof StoreTask) {
((StoreTask)runnable).releaseLocks();
}
- }
- }
- @Override
- public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
- return new JobSchedulerStoreImpl();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index eca83e8..d10c4eb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -16,44 +16,12 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.transaction.xa.Xid;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.Lockable;
-import org.apache.activemq.broker.LockableServiceSupport;
-import org.apache.activemq.broker.Locker;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.broker.*;
+import org.apache.activemq.command.*;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.SharedFileLocker;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionIdTransformer;
-import org.apache.activemq.store.TransactionIdTransformerAware;
-import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.store.*;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@@ -62,6 +30,13 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.*;
+
/**
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
* distribution of destinations across multiple kahaDB persistence adapters
@@ -75,7 +50,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
final int LOCAL_FORMAT_ID_MAGIC = Integer.valueOf(System.getProperty("org.apache.activemq.store.kahadb.MultiKahaDBTransactionStore.localXaFormatId", "61616"));
final class DelegateDestinationMap extends DestinationMap {
- @Override
public void setEntries(List<DestinationMapEntry> entries) {
super.setEntries(entries);
}
@@ -278,7 +252,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
if (adapter instanceof PersistenceAdapter) {
adapter.removeQueueMessageStore(destination);
- removeMessageStore(adapter, destination);
+ removeMessageStore((PersistenceAdapter)adapter, destination);
destinationMap.removeAll(destination);
}
}
@@ -293,7 +267,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
if (adapter instanceof PersistenceAdapter) {
adapter.removeTopicMessageStore(destination);
- removeMessageStore(adapter, destination);
+ removeMessageStore((PersistenceAdapter)adapter, destination);
destinationMap.removeAll(destination);
}
}
@@ -479,7 +453,6 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}
}
- @Override
public BrokerService getBrokerService() {
return brokerService;
}
@@ -530,9 +503,4 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
locker.configure(this);
return locker;
}
-
- @Override
- public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
- return new JobSchedulerStoreImpl();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 8840a1d..c7ece83 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -31,24 +31,16 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.store.AbstractMessageStore;
-import org.apache.activemq.store.ListenableFuture;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.ProxyMessageStore;
-import org.apache.activemq.store.ProxyTopicMessageStore;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionRecoveryListener;
-import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.*;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
import org.apache.activemq.store.kahadb.data.KahaEntryType;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
+import org.apache.activemq.util.IOHelper;
import org.apache.activemq.store.kahadb.disk.journal.Journal;
import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,7 +186,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return inflightTransactions.remove(txid);
}
- @Override
public void prepare(TransactionId txid) throws IOException {
Tx tx = getTx(txid);
for (TransactionStore store : tx.getStores()) {
@@ -202,7 +193,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
- @Override
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit)
throws IOException {
@@ -257,7 +247,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return location;
}
- @Override
public void rollback(TransactionId txid) throws IOException {
Tx tx = removeTx(txid);
if (tx != null) {
@@ -267,7 +256,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
}
- @Override
public void start() throws Exception {
journal = new Journal() {
@Override
@@ -301,7 +289,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore");
}
- @Override
public void stop() throws Exception {
journal.close();
journal = null;
@@ -347,7 +334,6 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
- @Override
public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
index 45e35c6..66ae496 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
@@ -22,13 +22,12 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
+import java.util.Map.Entry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -52,35 +51,31 @@ import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.store.kahadb.data.KahaLocation;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.disk.journal.Location;
-import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
public class TempKahaDBStore extends TempMessageDatabase implements PersistenceAdapter, BrokerServiceAware {
private final WireFormat wireFormat = new OpenWireFormat();
private BrokerService brokerService;
- @Override
public void setBrokerName(String brokerName) {
}
- @Override
public void setUsageManager(SystemUsage usageManager) {
}
- @Override
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
-
- @Override
+
public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException {
if (preCommit != null) {
preCommit.run();
@@ -90,21 +85,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
postCommit.run();
}
}
- @Override
public void prepare(TransactionId txid) throws IOException {
- processPrepare(txid);
+ processPrepare(txid);
}
- @Override
public void rollback(TransactionId txid) throws IOException {
- processRollback(txid);
+ processRollback(txid);
}
- @Override
public void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, ArrayList<Operation>> entry : preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId)entry.getKey();
ArrayList<Message> messageList = new ArrayList<Message>();
ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
-
+
for (Operation op : entry.getValue()) {
if( op.getClass() == AddOpperation.class ) {
AddOpperation addOp = (AddOpperation)op;
@@ -116,7 +108,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
ackList.add(ack);
}
}
-
+
Message[] addedMessages = new Message[messageList.size()];
MessageAck[] acks = new MessageAck[ackList.size()];
messageList.toArray(addedMessages);
@@ -124,10 +116,8 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
listener.recover(xid, addedMessages, acks);
}
}
- @Override
public void start() throws Exception {
}
- @Override
public void stop() throws Exception {
}
};
@@ -146,15 +136,13 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return destination;
}
- @Override
public void addMessage(ConnectionContext context, Message message) throws IOException {
KahaAddMessageCommand command = new KahaAddMessageCommand();
command.setDestination(dest);
command.setMessageId(message.getMessageId().toProducerKey());
processAdd(command, message.getTransactionId(), wireFormat.marshal(message));
}
-
- @Override
+
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
@@ -162,23 +150,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
processRemove(command, ack.getTransactionId());
}
- @Override
public void removeAllMessages(ConnectionContext context) throws IOException {
KahaRemoveDestinationCommand command = new KahaRemoveDestinationCommand();
command.setDestination(dest);
process(command);
}
- @Override
public Message getMessage(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
-
+
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
ByteSequence data;
synchronized(indexMutex) {
data = pageFile.tx().execute(new Transaction.CallableClosure<ByteSequence, IOException>(){
- @Override
public ByteSequence execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long sequence = sd.messageIdIndex.get(tx, key);
@@ -192,16 +177,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
if( data == null ) {
return null;
}
-
+
Message msg = (Message)wireFormat.unmarshal( data );
- return msg;
+ return msg;
}
-
- @Override
+
public int getMessageCount() throws IOException {
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
- @Override
public Integer execute(Transaction tx) throws IOException {
// Iterate through all index entries to get a count of messages in the destination.
StoredDestination sd = getStoredDestination(dest, tx);
@@ -216,11 +199,9 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
- @Override
public void recover(final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
- @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
@@ -233,12 +214,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
long cursorPos=0;
-
- @Override
+
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
- @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageRecord> entry=null;
@@ -259,22 +238,20 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
- @Override
public void resetBatching() {
cursorPos=0;
}
-
+
@Override
public void setBatch(MessageId identity) throws IOException {
final String key = identity.toProducerKey();
-
+
// Hopefully one day the page file supports concurrent read operations... but for now we must
// externally synchronize...
Long location;
synchronized(indexMutex) {
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>(){
- @Override
public Long execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
return sd.messageIdIndex.get(tx, key);
@@ -284,7 +261,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
if( location!=null ) {
cursorPos=location+1;
}
-
+
}
@Override
@@ -296,15 +273,14 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
@Override
public void stop() throws Exception {
}
-
+
}
-
+
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
public KahaDBTopicMessageStore(ActiveMQTopic destination) {
super(destination);
}
-
- @Override
+
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
MessageId messageId, MessageAck ack) throws IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
@@ -318,7 +294,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
processRemove(command, null);
}
- @Override
public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
String subscriptionKey = subscriptionKey(subscriptionInfo.getClientId(), subscriptionInfo.getSubscriptionName());
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
@@ -330,7 +305,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
process(command);
}
- @Override
public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
KahaSubscriptionCommand command = new KahaSubscriptionCommand();
command.setDestination(dest);
@@ -338,13 +312,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
process(command);
}
- @Override
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
-
+
final ArrayList<SubscriptionInfo> subscriptions = new ArrayList<SubscriptionInfo>();
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
- @Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<String, KahaSubscriptionCommand>> iterator = sd.subscriptions.iterator(tx); iterator.hasNext();) {
@@ -356,18 +328,16 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
});
}
-
+
SubscriptionInfo[]rc=new SubscriptionInfo[subscriptions.size()];
subscriptions.toArray(rc);
return rc;
}
- @Override
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<SubscriptionInfo, IOException>(){
- @Override
public SubscriptionInfo execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey);
@@ -379,13 +349,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
});
}
}
-
- @Override
+
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
- @Override
public Integer execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
@@ -394,7 +362,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return 0;
}
cursorPos += 1;
-
+
int counter = 0;
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
iterator.next();
@@ -403,20 +371,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return counter;
}
});
- }
+ }
}
- @Override
public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
- @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
-
+
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
Entry<Long, MessageRecord> entry = iterator.next();
listener.recoverMessage( (Message) wireFormat.unmarshal(entry.getValue().data ) );
@@ -426,12 +392,10 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
- @Override
public void recoverNextMessages(String clientId, String subscriptionName, final int maxReturned, final MessageRecoveryListener listener) throws Exception {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<Exception>(){
- @Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
@@ -439,7 +403,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
cursorPos += 1;
}
-
+
Entry<Long, MessageRecord> entry=null;
int counter = 0;
for (Iterator<Entry<Long, MessageRecord>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
@@ -458,13 +422,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
- @Override
public void resetBatching(String clientId, String subscriptionName) {
try {
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
- @Override
public void execute(Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(dest, tx);
sd.subscriptionCursors.remove(subscriptionKey);
@@ -480,13 +442,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
String subscriptionKey(String clientId, String subscriptionName){
return clientId+":"+subscriptionName;
}
-
- @Override
+
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
return new KahaDBMessageStore(destination);
}
- @Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
return new KahaDBTopicMessageStore(destination);
}
@@ -497,7 +457,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
*
* @param destination Destination to forget
*/
- @Override
public void removeQueueMessageStore(ActiveMQQueue destination) {
}
@@ -507,22 +466,18 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
*
* @param destination Destination to forget
*/
- @Override
public void removeTopicMessageStore(ActiveMQTopic destination) {
}
- @Override
public void deleteAllMessages() throws IOException {
}
-
-
- @Override
+
+
public Set<ActiveMQDestination> getDestinations() {
try {
final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
synchronized(indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>(){
- @Override
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = destinations.iterator(tx); iterator.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
@@ -536,13 +491,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
throw new RuntimeException(e);
}
}
-
- @Override
+
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
-
- @Override
+
public long size() {
if ( !started.get() ) {
return 0;
@@ -554,36 +507,32 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
}
- @Override
public void beginTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
- @Override
public void commitTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
- @Override
public void rollbackTransaction(ConnectionContext context) throws IOException {
throw new IOException("Not yet implemented.");
}
-
- @Override
+
public void checkpoint(boolean sync) throws IOException {
- }
+ }
///////////////////////////////////////////////////////////////////
// Internal conversion methods.
///////////////////////////////////////////////////////////////////
+
-
-
+
KahaLocation convert(Location location) {
KahaLocation rc = new KahaLocation();
rc.setLogId(location.getDataFileId());
rc.setOffset(location.getOffset());
return rc;
}
-
+
KahaDestination convert(ActiveMQDestination dest) {
KahaDestination rc = new KahaDestination();
rc.setName(dest.getPhysicalName());
@@ -612,7 +561,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
int type = Integer.parseInt(dest.substring(0, p));
String name = dest.substring(p+1);
-
+
switch( KahaDestination.DestinationType.valueOf(type) ) {
case QUEUE:
return new ActiveMQQueue(name);
@@ -622,12 +571,11 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
return new ActiveMQTempQueue(name);
case TEMP_TOPIC:
return new ActiveMQTempTopic(name);
- default:
+ default:
throw new IllegalArgumentException("Not in the valid destination format");
}
}
-
- @Override
+
public long getLastProducerSequenceId(ProducerId id) {
return -1;
}
@@ -644,8 +592,4 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
}
super.load();
}
- @Override
- public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
- throw new UnsupportedOperationException();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
index 43fc152..be4f2ff 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/Visitor.java
@@ -20,16 +20,11 @@ import java.io.IOException;
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
-import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
@@ -67,21 +62,6 @@ public class Visitor {
public void visit(KahaAckMessageFileMapCommand kahaProducerAuditCommand) throws IOException {
}
- public void visit(KahaAddScheduledJobCommand kahaAddScheduledJobCommand) throws IOException {
- }
-
- public void visit(KahaRescheduleJobCommand KahaRescheduleJobCommand) throws IOException {
- }
-
- public void visit(KahaRemoveScheduledJobCommand kahaRemoveScheduledJobCommand) throws IOException {
- }
-
- public void visit(KahaRemoveScheduledJobsCommand kahaRemoveScheduledJobsCommand) throws IOException {
- }
-
- public void visit(KahaDestroySchedulerCommand KahaDestroySchedulerCommand) throws IOException {
- }
-
public void visit(KahaUpdateMessageCommand kahaUpdateMessageCommand) throws IOException {
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
index 217bc1f..86b9fa3 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java
@@ -25,8 +25,8 @@ public class JobImpl implements Job {
private final JobLocation jobLocation;
private final byte[] payload;
- protected JobImpl(JobLocation location, ByteSequence bs) {
- this.jobLocation = location;
+ protected JobImpl(JobLocation location,ByteSequence bs) {
+ this.jobLocation=location;
this.payload = new byte[bs.getLength()];
System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength());
}
@@ -38,22 +38,22 @@ public class JobImpl implements Job {
@Override
public byte[] getPayload() {
- return this.payload;
+ return this.payload;
}
@Override
public long getPeriod() {
- return this.jobLocation.getPeriod();
+ return this.jobLocation.getPeriod();
}
@Override
public int getRepeat() {
- return this.jobLocation.getRepeat();
+ return this.jobLocation.getRepeat();
}
@Override
public long getStart() {
- return this.jobLocation.getStartTime();
+ return this.jobLocation.getStartTime();
}
@Override
@@ -76,13 +76,4 @@ public class JobImpl implements Job {
return JobSupport.getDateTime(getStart());
}
- @Override
- public int getExecutionCount() {
- return this.jobLocation.getRescheduledCount();
- }
-
- @Override
- public String toString() {
- return "Job: " + getJobId();
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3424e04f/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
index cb66145..13cf376 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java
@@ -36,8 +36,6 @@ class JobLocation {
private long period;
private String cronEntry;
private final Location location;
- private int rescheduledCount;
- private Location lastUpdate;
public JobLocation(Location location) {
this.location = location;
@@ -54,12 +52,8 @@ class JobLocation {
this.delay = in.readLong();
this.nextTime = in.readLong();
this.period = in.readLong();
- this.cronEntry = in.readUTF();
+ this.cronEntry=in.readUTF();
this.location.readExternal(in);
- if (in.readBoolean()) {
- this.lastUpdate = new Location();
- this.lastUpdate.readExternal(in);
- }
}
public void writeExternal(DataOutput out) throws IOException {
@@ -69,17 +63,11 @@ class JobLocation {
out.writeLong(this.delay);
out.writeLong(this.nextTime);
out.writeLong(this.period);
- if (this.cronEntry == null) {
- this.cronEntry = "";
+ if (this.cronEntry==null) {
+ this.cronEntry="";
}
out.writeUTF(this.cronEntry);
this.location.writeExternal(out);
- if (lastUpdate != null) {
- out.writeBoolean(true);
- this.lastUpdate.writeExternal(out);
- } else {
- out.writeBoolean(false);
- }
}
/**
@@ -135,8 +123,7 @@ class JobLocation {
}
/**
- * @param nextTime
- * the nextTime to set
+ * @param nextTime the nextTime to set
*/
public synchronized void setNextTime(long nextTime) {
this.nextTime = nextTime;
@@ -165,8 +152,7 @@ class JobLocation {
}
/**
- * @param cronEntry
- * the cronEntry to set
+ * @param cronEntry the cronEntry to set
*/
public synchronized void setCronEntry(String cronEntry) {
this.cronEntry = cronEntry;
@@ -187,8 +173,7 @@ class JobLocation {
}
/**
- * @param delay
- * the delay to set
+ * @param delay the delay to set
*/
public void setDelay(long delay) {
this.delay = delay;
@@ -201,55 +186,15 @@ class JobLocation {
return this.location;
}
- /**
- * @returns the location in the journal of the last update issued for this
- * Job.
- */
- public Location getLastUpdate() {
- return this.lastUpdate;
- }
-
- /**
- * Sets the location of the last update command written to the journal for
- * this Job. The update commands set the next execution time for this job.
- * We need to keep track of only the latest update as it's the only one we
- * really need to recover the correct state from the journal.
- *
- * @param location
- * The location in the journal of the last update command.
- */
- public void setLastUpdate(Location location) {
- this.lastUpdate = location;
- }
-
- /**
- * @return the number of time this job has been rescheduled.
- */
- public int getRescheduledCount() {
- return rescheduledCount;
- }
-
- /**
- * Sets the number of time this job has been rescheduled. A newly added job will return
- * zero and increment this value each time a scheduled message is dispatched to its
- * target destination and the job is rescheduled for another cycle.
- *
- * @param executionCount
- * the new execution count to assign the JobLocation.
- */
- public void setRescheduledCount(int rescheduledCount) {
- this.rescheduledCount = rescheduledCount;
- }
-
@Override
public String toString() {
- return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" + repeat + ", nextTime="
- + new Date(nextTime) + ", executionCount = " + (rescheduledCount + 1) + "]";
+ return "Job [id=" + jobId + ", startTime=" + new Date(startTime)
+ + ", delay=" + delay + ", period=" + period + ", repeat="
+ + repeat + ", nextTime=" + new Date(nextTime) + "]";
}
static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> {
static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller();
-
@Override
public List<JobLocation> readPayload(DataInput dataIn) throws IOException {
List<JobLocation> result = new ArrayList<JobLocation>();
@@ -283,7 +228,6 @@ class JobLocation {
result = prime * result + (int) (period ^ (period >>> 32));
result = prime * result + repeat;
result = prime * result + (int) (startTime ^ (startTime >>> 32));
- result = prime * result + (rescheduledCount ^ (rescheduledCount >>> 32));
return result;
}
@@ -342,9 +286,6 @@ class JobLocation {
if (startTime != other.startTime) {
return false;
}
- if (rescheduledCount != other.rescheduledCount) {
- return false;
- }
return true;
}