You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/01/13 10:20:24 UTC
svn commit: r898689 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb:
KahaDBPersistenceAdapter.java MessageDatabase.java
Author: rajdavies
Date: Wed Jan 13 09:20:24 2010
New Revision: 898689
URL: http://svn.apache.org/viewvc?rev=898689&view=rev
Log:
expose the Journal property archiveDataLogs through KahaDB
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=898689&r1=898688&r2=898689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Jan 13 09:20:24 2010
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.store.kahadb;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -28,9 +31,6 @@
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with some
@@ -40,7 +40,7 @@
* @version $Revision: 1.17 $
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
- private KahaDBStore letter = new KahaDBStore();
+ private final KahaDBStore letter = new KahaDBStore();
/**
@@ -370,4 +370,19 @@
public void setBrokerService(BrokerService brokerService) {
letter.setBrokerService(brokerService);
}
+
+
+ /**
+ * @return the archiveDataLogs
+ */
+ public boolean isArchiveDataLogs() {
+ return letter.isArchiveDataLogs();
+ }
+
+ /**
+ * @param archiveDataLogs the archiveDataLogs to set
+ */
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ letter.setArchiveDataLogs(archiveDataLogs);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=898689&r1=898688&r2=898689&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Jan 13 09:20:24 2010
@@ -35,7 +35,6 @@
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.command.ConnectionId;
@@ -77,7 +76,6 @@
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
-import org.springframework.core.enums.LetterCodedLabeledEnum;
public class MessageDatabase implements BrokerServiceAware {
@@ -159,6 +157,7 @@
protected File directory;
protected Thread checkpointThread;
protected boolean enableJournalDiskSyncs=true;
+ protected boolean archiveDataLogs;
long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@@ -173,6 +172,7 @@
private int indexCacheSize = 100;
private boolean checkForCorruptJournalFiles = false;
private boolean checksumJournalFiles = false;
+
public MessageDatabase() {
}
@@ -234,6 +234,7 @@
private void startCheckpoint() {
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
+ @Override
public void run() {
try {
long lastCleanup = System.currentTimeMillis();
@@ -510,6 +511,7 @@
final ArrayList<Long> matches = new ArrayList<Long>();
sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
+ @Override
protected void matched(Location key, Long value) {
matches.add(value);
}
@@ -1375,6 +1377,7 @@
this.command = command;
}
+ @Override
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
@@ -1392,6 +1395,7 @@
this.command = command;
}
+ @Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
@@ -1420,6 +1424,7 @@
manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
+ manager.setArchiveDataLogs(isArchiveDataLogs());
return manager;
}
@@ -1552,4 +1557,18 @@
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
+
+ /**
+ * @return the archiveDataLogs
+ */
+ public boolean isArchiveDataLogs() {
+ return this.archiveDataLogs;
+ }
+
+ /**
+ * @param archiveDataLogs the archiveDataLogs to set
+ */
+ public void setArchiveDataLogs(boolean archiveDataLogs) {
+ this.archiveDataLogs = archiveDataLogs;
+ }
}