You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/25 23:11:21 UTC
svn commit: r758450 [2/2] - in /activemq/sandbox/activemq-flow/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/openwire/
main/java/org/apache/activemq/broker/protocol/
main/java/org/apache/activemq/broker/stomp/ main/jav...
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Mar 25 22:11:15 2009
@@ -16,73 +16,36 @@
*/
package org.apache.activemq.broker.store.kahadb;
-import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Set;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.kahadb.Operation.AddOpperation;
-import org.apache.activemq.broker.store.kahadb.Operation.RemoveOpperation;
-import org.apache.activemq.broker.store.kahadb.StoredDBState.DBStateMarshaller;
-import org.apache.activemq.broker.store.kahadb.StoredDestinationState.StoredDestinationMarshaller;
-import org.apache.activemq.broker.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination;
-import org.apache.activemq.broker.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocalTransactionId;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocation;
-import org.apache.activemq.broker.store.kahadb.data.KahaPrepareCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRemoveDestinationCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaRollbackCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.broker.store.kahadb.data.KahaTransactionInfo;
-import org.apache.activemq.broker.store.kahadb.data.KahaXATransactionId;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination.DestinationType;
-import org.apache.activemq.broker.store.kahadb.data.KahaDestination.KahaDestinationBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaEntryType.KahaEntryTypeCreatable;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocalTransactionId.KahaLocalTransactionIdBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaLocation.KahaLocationBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaTraceCommand.KahaTraceCommandBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaTransactionInfo.KahaTransactionInfoBean;
-import org.apache.activemq.broker.store.kahadb.data.KahaXATransactionId.KahaXATransactionIdBean;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
+import org.apache.activemq.broker.store.kahadb.Data.Trace;
+import org.apache.activemq.broker.store.kahadb.Data.Type;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAdd.QueueAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemove.QueueRemoveBean;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage.QueueRemoveMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.Type.TypeCreatable;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.activemq.protobuf.PBMessage;
-import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.TransactionStore;
-import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
@@ -93,8 +56,6 @@
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.StringMarshaller;
public class KahaDBStore implements Store {
@@ -106,11 +67,9 @@
protected PageFile pageFile;
protected Journal journal;
- protected StoredDBState dbstate = new StoredDBState(this);
- protected DBStateMarshaller dbstateMarshaller = new DBStateMarshaller(this);
+ protected StoredDBState dbstate = new StoredDBState();
protected boolean failIfDatabaseIsLocked;
-
protected boolean deleteAllMessages;
protected File directory;
protected Thread checkpointThread;
@@ -124,192 +83,16 @@
protected AtomicBoolean started = new AtomicBoolean();
protected AtomicBoolean opened = new AtomicBoolean();
private LockFile lockFile;
- WireFormat wireFormat = new OpenWireFormat();
-
- public TransactionStore createTransactionStore() throws IOException {
- return new KahaDBTransactionStore(this);
- }
-
- String subscriptionKey(String clientId, String subscriptionName){
- return clientId+":"+subscriptionName;
- }
-
- public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
- return new KahaDBMessageStore(this, destination);
- }
-
- public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
- return new KahaDBTopicMessageStore(this, destination);
- }
-
- /**
- * Cleanup method to remove any state associated with the given destination.
- * This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
- */
- public void removeQueueMessageStore(ActiveMQQueue destination) {
- }
-
- /**
- * Cleanup method to remove any state associated with the given destination
- * This method does not stop the message store (it might not be cached).
- *
- * @param destination Destination to forget
- */
- public void removeTopicMessageStore(ActiveMQTopic destination) {
- }
-
- public void deleteAllMessages() throws IOException {
- deleteAllMessages=true;
- }
-
- public Set<ActiveMQDestination> getDestinations() {
- try {
- final HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
- synchronized(indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>(){
- public void execute(Transaction tx) throws IOException {
- for (Iterator<Entry<String, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
- Entry<String, StoredDestinationState> entry = iterator.next();
- rc.add(convert(entry.getKey()));
- }
- }
- });
- }
- return rc;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public long getLastMessageBrokerSequenceId() throws IOException {
- return 0;
- }
-
- public long size() {
- if ( !started.get() ) {
- return 0;
- }
- try {
- return journal.getDiskSize() + pageFile.getDiskSize();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void beginTransaction() throws IOException {
- throw new IOException("Not yet implemented.");
- }
- public void commitTransaction() throws IOException {
- throw new IOException("Not yet implemented.");
- }
- public void rollbackTransaction() throws IOException {
- throw new IOException("Not yet implemented.");
- }
-
- public void checkpoint(boolean sync) throws IOException {
- checkpointCleanup(false);
- }
-
- ///////////////////////////////////////////////////////////////////
- // Internal helper methods.
- ///////////////////////////////////////////////////////////////////
+ private Location nextRecoveryPosition;
+ private Location lastRecoveryPosition;
- /**
- * @param location
- * @return
- * @throws IOException
- */
- Message loadMessage(Location location) throws IOException {
- KahaAddMessageCommand addMessage = (KahaAddMessageCommand)load(location);
- Message msg = (Message)wireFormat.unmarshal( new DataInputStream(addMessage.getMessage().newInput()) );
- return msg;
- }
+ protected final Object indexMutex = new Object();
+ private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
+ private final HashMap<AsciiBuffer, StoredDestinationState> storedDestinations = new HashMap<AsciiBuffer, StoredDestinationState>();
///////////////////////////////////////////////////////////////////
- // Internal conversion methods.
+ // Lifecylce methods
///////////////////////////////////////////////////////////////////
-
- KahaTransactionInfo createTransactionInfo(TransactionId txid) {
- if( txid ==null ) {
- return null;
- }
- KahaTransactionInfoBean rc = new KahaTransactionInfoBean();
-
- // Link it up to the previous record that was part of the transaction.
- ArrayList<Operation> tx = inflightTransactions.get(txid);
- if( tx!=null ) {
- rc.setPreviousEntry(convert(tx.get(tx.size()-1).location));
- }
-
- if( txid.isLocalTransaction() ) {
- LocalTransactionId t = (LocalTransactionId)txid;
- KahaLocalTransactionIdBean kahaTxId = new KahaLocalTransactionIdBean();
- kahaTxId.setConnectionId(t.getConnectionId().getValue());
- kahaTxId.setTransacitonId(t.getValue());
- rc.setLocalTransacitonId(kahaTxId);
- } else {
- XATransactionId t = (XATransactionId)txid;
- KahaXATransactionIdBean kahaTxId = new KahaXATransactionIdBean();
- kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
- kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
- kahaTxId.setFormatId(t.getFormatId());
- rc.setXaTransacitonId(kahaTxId);
- }
- return rc;
- }
-
- KahaLocation convert(Location location) {
- KahaLocationBean rc = new KahaLocationBean();
- rc.setLogId(location.getDataFileId());
- rc.setOffset(location.getOffset());
- return rc;
- }
-
- KahaDestination convert(ActiveMQDestination dest) {
- KahaDestinationBean rc = new KahaDestinationBean();
- rc.setName(dest.getPhysicalName());
- switch( dest.getDestinationType() ) {
- case ActiveMQDestination.QUEUE_TYPE:
- rc.setType(DestinationType.QUEUE);
- return rc;
- case ActiveMQDestination.TOPIC_TYPE:
- rc.setType(DestinationType.TOPIC);
- return rc;
- case ActiveMQDestination.TEMP_QUEUE_TYPE:
- rc.setType(DestinationType.TEMP_QUEUE);
- return rc;
- case ActiveMQDestination.TEMP_TOPIC_TYPE:
- rc.setType(DestinationType.TEMP_TOPIC);
- return rc;
- default:
- return null;
- }
- }
-
- ActiveMQDestination convert(String dest) {
- int p = dest.indexOf(":");
- if( p<0 ) {
- throw new IllegalArgumentException("Not in the valid destination format");
- }
- int type = Integer.parseInt(dest.substring(0, p));
- String name = dest.substring(p+1);
-
- switch( KahaDestination.DestinationType.valueOf(type) ) {
- case QUEUE:
- return new ActiveMQQueue(name);
- case TOPIC:
- return new ActiveMQTopic(name);
- case TEMP_QUEUE:
- return new ActiveMQTempQueue(name);
- case TEMP_TOPIC:
- return new ActiveMQTempTopic(name);
- default:
- throw new IllegalArgumentException("Not in the valid destination format");
- }
- }
-
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
load();
@@ -329,35 +112,24 @@
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
- // First time this is created.. Initialize the metadata
- Page<StoredDBState> page = tx.allocate();
- assert page.getPageId() == 0;
- page.set(dbstate);
- dbstate.page = page;
- dbstate.state = CLOSED_STATE;
- dbstate.destinations = new BTreeIndex<String, StoredDestinationState>(pageFile, tx.allocate().getPageId());
-
- tx.store(dbstate.page, dbstateMarshaller, true);
+ dbstate.allocate(tx);
} else {
- Page<StoredDBState> page = tx.load(0, dbstateMarshaller);
+ Page<StoredDBState> page = tx.load(0, StoredDBState.MARSHALLER);
dbstate = page.get();
dbstate.page = page;
}
- dbstate.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
- dbstate.destinations.setValueMarshaller(new StoredDestinationMarshaller(KahaDBStore.this));
- dbstate.destinations.load(tx);
+ dbstate.load(tx);
}
});
pageFile.flush();
-
- // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
- // Perhaps we should just keep an index of file
+
+ // Keep a cache of the StoredDestinations
storedDestinations.clear();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- for (Iterator<Entry<String, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
- Entry<String, StoredDestinationState> entry = iterator.next();
- StoredDestinationState sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
+ for (Iterator<Entry<AsciiBuffer, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
+ Entry<AsciiBuffer, StoredDestinationState> entry = iterator.next();
+ StoredDestinationState sd = loadStoredDestination(tx, entry.getKey());
storedDestinations.put(entry.getKey(), sd);
}
}
@@ -365,6 +137,20 @@
}
}
+
+ private StoredDestinationState loadStoredDestination(Transaction tx, AsciiBuffer key) throws IOException {
+ // Try to load the existing indexes..
+ StoredDestinationState rc = dbstate.destinations.get(tx, key);
+ if (rc == null) {
+ // Brand new destination.. allocate indexes for it.
+ rc = new StoredDestinationState();
+ rc.allocate(tx);
+ dbstate.destinations.put(tx, key, rc);
+ }
+ rc.load(tx);
+ return rc;
+ }
+
/**
* @throws IOException
*/
@@ -434,25 +220,23 @@
pageFile.unload();
pageFile.delete();
- dbstate = new StoredDBState(this);
+ dbstate = new StoredDBState();
LOG.info("Persistence store purged.");
deleteAllMessages = false;
loadPageFile();
}
- store(new KahaTraceCommandBean().setMessage("LOADED " + new Date()));
-
+ store( new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
}
}
-
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
synchronized (indexMutex) {
pageFile.unload();
- dbstate = new StoredDBState(this);
+ dbstate = new StoredDBState();
}
journal.close();
checkpointThread.join();
@@ -465,11 +249,9 @@
synchronized (indexMutex) {
if( pageFile.isLoaded() ) {
dbstate.state = CLOSED_STATE;
- dbstate.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
-
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- tx.store(dbstate.page, dbstateMarshaller, true);
+ tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
}
});
close();
@@ -477,22 +259,9 @@
}
}
- /**
- * @return
- */
- private Location getFirstInProgressTxLocation() {
- Location l = null;
- if (!inflightTransactions.isEmpty()) {
- l = inflightTransactions.values().iterator().next().get(0).getLocation();
- }
- if (!preparedTransactions.isEmpty()) {
- Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
- if (l==null || t.compareTo(l) <= 0) {
- l = t;
- }
- }
- return l;
- }
+ ///////////////////////////////////////////////////////////////////
+ // Recovery methods
+ ///////////////////////////////////////////////////////////////////
/**
* Move all the messages that were in the journal into long term storage. We
@@ -511,9 +280,16 @@
if( recoveryPosition!=null ) {
int redoCounter = 0;
while (recoveryPosition != null) {
- KahaEntryTypeCreatable message = load(recoveryPosition);
+ final TypeCreatable message = load(recoveryPosition);
+ final Location location = lastRecoveryPosition;
dbstate.lastUpdate = recoveryPosition;
- process(message, recoveryPosition);
+
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ updateIndex(tx, message.toType(), (MessageBuffer)message, location);
+ }
+ });
+
redoCounter++;
recoveryPosition = journal.getNextLocation(recoveryPosition);
}
@@ -530,34 +306,61 @@
}
}
+ public void incrementalRecover() throws IOException {
+ synchronized (indexMutex) {
+ if( nextRecoveryPosition == null ) {
+ if( lastRecoveryPosition==null ) {
+ nextRecoveryPosition = getRecoveryPosition();
+ } else {
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+ while (nextRecoveryPosition != null) {
+ lastRecoveryPosition = nextRecoveryPosition;
+ dbstate.lastUpdate = lastRecoveryPosition;
+ final TypeCreatable message = load(lastRecoveryPosition);
+ final Location location = lastRecoveryPosition;
+
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ updateIndex(tx, message.toType(), (MessageBuffer)message, location);
+ }
+ });
+
+ nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+ }
+
protected void recoverIndex(Transaction tx) throws IOException {
long start = System.currentTimeMillis();
// It is possible index updates got applied before the journal updates..
// in that case we need to removed references to messages that are not in the journal
final Location lastAppendLocation = journal.getLastAppendLocation();
long undoCounter=0;
-
- // Go through all the destinations to see if they have messages past the lastAppendLocation
- for (StoredDestinationState sd : storedDestinations.values()) {
-
- final ArrayList<Long> matches = new ArrayList<Long>();
- // Find all the Locations that are >= than the last Append Location.
- sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
- @Override
- protected void matched(Location key, Long value) {
- matches.add(value);
- }
- });
-
-
- for (Long sequenceId : matches) {
- MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- sd.locationIndex.remove(tx, keys.location);
- sd.messageIdIndex.remove(tx, keys.messageId);
- undoCounter++;
- // TODO: do we need to modify the ack positions for the pub sub case?
- }
- }
+
+// TODO
+// // Go through all the destinations to see if they have messages past the lastAppendLocation
+// for (StoredDestinationState sd : storedDestinations.values()) {
+//
+// final ArrayList<Long> matches = new ArrayList<Long>();
+// // Find all the Locations that are >= than the last Append Location.
+// sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+// @Override
+// protected void matched(Location key, Long value) {
+// matches.add(value);
+// }
+// });
+//
+//
+// for (Long sequenceId : matches) {
+// MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+// sd.locationIndex.remove(tx, keys.location);
+// sd.messageIdIndex.remove(tx, keys.messageId);
+// undoCounter++;
+// // TODO: do we need to modify the ack positions for the pub sub case?
+// }
+// }
long end = System.currentTimeMillis();
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting these the end user
@@ -565,28 +368,6 @@
LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
-
- private Location nextRecoveryPosition;
- private Location lastRecoveryPosition;
-
- public void incrementalRecover() throws IOException {
- synchronized (indexMutex) {
- if( nextRecoveryPosition == null ) {
- if( lastRecoveryPosition==null ) {
- nextRecoveryPosition = getRecoveryPosition();
- } else {
- nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
- }
- }
- while (nextRecoveryPosition != null) {
- lastRecoveryPosition = nextRecoveryPosition;
- dbstate.lastUpdate = lastRecoveryPosition;
- KahaEntryTypeCreatable message = load(lastRecoveryPosition);
- process(message, lastRecoveryPosition);
- nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
- }
- }
- }
public Location getLastUpdatePosition() throws IOException {
return dbstate.lastUpdate;
@@ -594,12 +375,6 @@
private Location getRecoveryPosition() throws IOException {
- // If we need to recover the transactions..
- if (dbstate.firstInProgressTransactionLocation != null) {
- return dbstate.firstInProgressTransactionLocation;
- }
-
- // Perhaps there were no transactions...
if( dbstate.lastUpdate!=null) {
// Start replay at the record after the last one recorded in the index file.
return journal.getNextLocation(dbstate.lastUpdate);
@@ -642,602 +417,370 @@
closure.execute();
}
}
+
+ /**
+ * @param tx
+ * @throws IOException
+ */
+ private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
+
+ LOG.debug("Checkpoint started.");
+
+ dbstate.state = OPEN_STATE;
+ tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+ pageFile.flush();
+
+ if( cleanup ) {
+
+ final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
+
+ // Don't GC files under replication
+ if( journalFilesBeingReplicated!=null ) {
+ gcCandidateSet.removeAll(journalFilesBeingReplicated);
+ }
+
+ // Don't GC files after the first in progress tx
+ Location firstTxLocation = dbstate.lastUpdate;
+
+ if( firstTxLocation!=null ) {
+ while( !gcCandidateSet.isEmpty() ) {
+ Integer last = gcCandidateSet.last();
+ if( last >= firstTxLocation.getDataFileId() ) {
+ gcCandidateSet.remove(last);
+ } else {
+ break;
+ }
+ }
+ }
+
+ // Go through all the destinations to see if any of them can remove GC candidates.
+ for (StoredDestinationState sd : storedDestinations.values()) {
+ if( gcCandidateSet.isEmpty() ) {
+ break;
+ }
+
+ // Use a visitor to cut down the number of pages that we load
+ dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+ int last=-1;
+ public boolean isInterestedInKeysBetween(Location first, Location second) {
+ if( first==null ) {
+ SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+ if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+ subset.remove(second.getDataFileId());
+ }
+ return !subset.isEmpty();
+ } else if( second==null ) {
+ SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+ if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+ subset.remove(first.getDataFileId());
+ }
+ return !subset.isEmpty();
+ } else {
+ SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+ if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+ subset.remove(first.getDataFileId());
+ }
+ if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+ subset.remove(second.getDataFileId());
+ }
+ return !subset.isEmpty();
+ }
+ }
+
+ public void visit(List<Location> keys, List<Long> values) {
+ for (Location l : keys) {
+ int fileId = l.getDataFileId();
+ if( last != fileId ) {
+ gcCandidateSet.remove(fileId);
+ last = fileId;
+ }
+ }
+ }
+
+ });
+ }
+
+ if( !gcCandidateSet.isEmpty() ) {
+ LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
+ journal.removeDataFiles(gcCandidateSet);
+ }
+ }
+
+ LOG.debug("Checkpoint done.");
+ }
+
+ public HashSet<Integer> getJournalFilesBeingReplicated() {
+ return journalFilesBeingReplicated;
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Store interface
+ ///////////////////////////////////////////////////////////////////
+ long messageSequence;
- // /////////////////////////////////////////////////////////////////
- // Methods call by the broker to update and query the store.
- // /////////////////////////////////////////////////////////////////
- public Location store(KahaEntryTypeCreatable data) throws IOException {
+ public Location store(TypeCreatable data) throws IOException {
return store(data, false);
}
/**
* All updated are are funneled through this method. The updates a converted
- * to a JournalMessage which is logged to the journal and then the data from
- * the JournalMessage is used to update the index just like it would be done
- * durring a recovery process.
+ * to a PBMessage which is logged to the journal and then the data from
+ * the PBMessage is used to update the index just like it would be done
+ * during a recovery process.
+ * @throws IOException
*/
@SuppressWarnings("unchecked")
- public Location store(KahaEntryTypeCreatable data, boolean sync) throws IOException {
- MessageBuffer message = ((PBMessage) data).freeze();
+ public Location store(final TypeCreatable data, boolean sync) throws IOException {
+ final MessageBuffer message = ((PBMessage) data).freeze();
int size = message.serializedSizeUnframed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(data.toKahaEntryType().getNumber());
+ os.writeByte(data.toType().getNumber());
message.writeUnframed(os);
long start = System.currentTimeMillis();
- Location location = journal.write(os.toByteSequence(), sync);
+ final Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
- process(data, location);
- long end = System.currentTimeMillis();
- if( end-start > 100 ) {
- LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
- }
+
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ updateIndex(tx, data.toType(), message, location);
+ }
+ });
+ }
+
+ long end = System.currentTimeMillis();
+ if( end-start > 100 ) {
+ LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+ }
synchronized (indexMutex) {
- dbstate.lastUpdate = location;
+ dbstate.lastUpdate = location;
}
return location;
}
-
+
/**
- * Loads a previously stored JournalMessage
+ * Loads a previously stored PBMessage
*
* @param location
* @return
* @throws IOException
*/
@SuppressWarnings("unchecked")
- public KahaEntryTypeCreatable load(Location location) throws IOException {
+ public TypeCreatable load(Location location) throws IOException {
ByteSequence data = journal.read(location);
DataByteArrayInputStream is = new DataByteArrayInputStream(data);
byte readByte = is.readByte();
- KahaEntryType type = KahaEntryType.valueOf(readByte);
+ Type type = Type.valueOf(readByte);
if( type == null ) {
throw new IOException("Could not load journal record. Invalid location: "+location);
}
MessageBuffer message = type.parseUnframed(new Buffer(data.data, data.offset+1, data.length-1));
- return (KahaEntryTypeCreatable)message;
+ return (TypeCreatable)message;
}
- // /////////////////////////////////////////////////////////////////
- // Journaled record processing methods. Once the record is journaled,
- // these methods handle applying the index updates. These may be called
- // from the recovery method too so they need to be idempotent
- // /////////////////////////////////////////////////////////////////
-
- private void process(KahaEntryTypeCreatable data, final Location location) throws IOException {
- switch(data.toKahaEntryType()) {
- case KAHA_ADD_MESSAGE_COMMAND:
- process((KahaAddMessageCommand)data, location);
- return;
- case KAHA_COMMIT_COMMAND:
- process((KahaCommitCommand)data, location);
- return;
- case KAHA_PREPARE_COMMAND:
- process((KahaPrepareCommand)data, location);
- return;
- case KAHA_REMOVE_DESTINATION_COMMAND:
- process((KahaRemoveDestinationCommand)data, location);
- return;
- case KAHA_REMOVE_MESSAGE_COMMAND:
- process((KahaRemoveMessageCommand)data, location);
+ @SuppressWarnings("unchecked")
+ public void updateIndex(Transaction tx, Type type, MessageBuffer message, Location location) {
+ switch (type) {
+ case MESSAGE_ADD:
+ messageAdd(tx, (MessageAdd)message, location);
return;
- case KAHA_ROLLBACK_COMMAND:
- process((KahaRollbackCommand)data, location);
+ case QUEUE_ADD:
+ queueAdd(tx, (QueueAdd)message, location);
return;
- case KAHA_SUBSCRIPTION_COMMAND:
- process((KahaSubscriptionCommand)data, location);
+ case QUEUE_ADD_MESSAGE:
+ queueAddMessage(tx, (QueueAdd)message, location);
return;
- case KAHA_TRACE_COMMAND:
- process((KahaTraceCommand)data, location);
+ case QUEUE_REMOVE_MESSAGE:
+ queueRemoveMessage(tx, (QueueRemoveMessage)message, location);
return;
+ case TRANSACTION_BEGIN:
+ case TRANSACTION_ADD_MESSAGE:
+ case TRANSACTION_REMOVE_MESSAGE:
+ case TRANSACTION_COMMIT:
+ case TRANSACTION_ROLLBACK:
+ case MAP_ADD:
+ case MAP_REMOVE:
+ case MAP_ENTRY_PUT:
+ case MAP_ENTRY_REMOVE:
+ case STREAM_OPEN:
+ case STREAM_WRITE:
+ case STREAM_CLOSE:
+ case STREAM_REMOVE:
+ throw new UnsupportedOperationException();
}
}
- private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
- if (command.hasTransactionInfo()) {
- synchronized (indexMutex) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new AddOpperation(this, command, location));
- }
- } else {
- synchronized (indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- upadateIndex(tx, command, location);
- }
- });
- }
- }
+ private void messageAdd(Transaction tx, MessageAdd message, Location location) {
+ }
+ private void queueAdd(Transaction tx, QueueAdd message, Location location) {
+ }
+ private void queueAddMessage(Transaction tx, QueueAdd message, Location location) {
+ }
+ private void queueRemoveMessage(Transaction tx, QueueRemoveMessage message, Location location) {
}
- protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
- if (command.hasTransactionInfo()) {
- synchronized (indexMutex) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new RemoveOpperation(this, command, location));
- }
- } else {
- synchronized (indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- updateIndex(tx, command, location);
- }
- });
+ class KahaDBSession implements Session {
+
+ ///////////////////////////////////////////////////////////////
+ // Message related methods.
+ ///////////////////////////////////////////////////////////////
+ public Long messageAdd(MessageRecord message) {
+ try {
+ Long id = dbstate.nextMessageId++;
+ MessageAddBean bean = new MessageAddBean();
+ bean.setBuffer(message.getBuffer());
+ bean.setEncoding(message.getEncoding());
+ bean.setMessageId(message.getMessageId());
+ bean.setMessageKey(id);
+ bean.setStreamKey(message.getStreamKey());
+ store(bean);
+ return id;
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
}
}
-
- }
-
- protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
- synchronized (indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- updateIndex(tx, command, location);
- }
- });
+ public Long messageGetKey(AsciiBuffer messageId) {
+ return null;
}
- }
-
- protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
- synchronized (indexMutex) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- updateIndex(tx, command, location);
- }
- });
+ public MessageRecord messageGetRecord(Long key) {
+ return null;
}
- }
- protected void process(KahaCommitCommand command, Location location) throws IOException {
- TransactionId key = key(command.getTransactionInfo());
- synchronized (indexMutex) {
- ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
- if (inflightTx == null) {
- inflightTx = preparedTransactions.remove(key);
- }
- if (inflightTx == null) {
- return;
+ ///////////////////////////////////////////////////////////////
+ // Queue related methods.
+ ///////////////////////////////////////////////////////////////
+ public void queueAdd(AsciiBuffer queueName) {
+ try {
+ store(new QueueAddBean().setQueueName(queueName));
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
}
-
- final ArrayList<Operation> messagingTx = inflightTx;
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- for (Operation op : messagingTx) {
- op.execute(tx);
- }
- }
- });
}
- }
-
- protected void process(KahaPrepareCommand command, Location location) {
- synchronized (indexMutex) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
- if (tx != null) {
- preparedTransactions.put(key, tx);
+ public boolean queueRemove(AsciiBuffer queueName) {
+ try {
+ store(new QueueRemoveBean().setQueueName(queueName));
+ return false;
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
}
}
- }
-
- protected void process(KahaRollbackCommand command, Location location) {
- synchronized (indexMutex) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
- if (tx == null) {
- preparedTransactions.remove(key);
+ public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
+ return null;
+ }
+ public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
+ try {
+ Long queueKey = 1L;
+ QueueAddMessageBean bean = new QueueAddMessageBean();
+ bean.setQueueName(queueName);
+ bean.setAttachment(record.getAttachment());
+ bean.setMessageKey(record.getMessageKey());
+ bean.setQueueKey(queueKey);
+ store(bean);
+ return queueKey;
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
}
}
- }
-
- // /////////////////////////////////////////////////////////////////
- // These methods do the actual index updates.
- // /////////////////////////////////////////////////////////////////
-
- protected final Object indexMutex = new Object();
- private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
-
- void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
- StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
+ public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+ try {
+ QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
+ bean.setQueueKey(queueKey);
+ bean.setQueueName(queueName);
+ store(bean);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
- // Skip adding the message to the index if this is a topic and there are
- // no subscriptions.
- if (sd.subscriptions != null && sd.ackPositions.isEmpty()) {
- return;
}
-
- // Add the message.
- long id = sd.nextMessageId++;
- Long previous = sd.locationIndex.put(tx, location, id);
- if( previous == null ) {
- sd.messageIdIndex.put(tx, command.getMessageId(), id);
- sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
- } else {
- // restore the previous value.. Looks like this was a redo of a previously
- // added message. We don't want to assing it a new id as the other indexes would
- // be wrong..
- sd.locationIndex.put(tx, location, previous);
+ public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+ return null;
}
- }
-
- void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
- StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
- if (!command.hasSubscriptionKey()) {
-
- // In the queue case we just remove the message from the index..
- Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
- if (sequenceId != null) {
- MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- sd.locationIndex.remove(tx, keys.location);
- }
- } else {
- // In the topic case we need remove the message once it's been acked
- // by all the subs
- Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
-
- // Make sure it's a valid message id...
- if (sequence != null) {
- String subscriptionKey = command.getSubscriptionKey();
- Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, sequence);
-
- // The following method handles deleting un-referenced messages.
- removeAckLocation(tx, sd, subscriptionKey, prev);
-
- // Add it to the new location set.
- addAckLocation(sd, sequence, subscriptionKey);
- }
-
- }
- }
-
- private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
- StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
- sd.orderIndex.clear(tx);
- sd.orderIndex.unload(tx);
- tx.free(sd.orderIndex.getPageId());
- sd.locationIndex.clear(tx);
- sd.locationIndex.unload(tx);
- tx.free(sd.locationIndex.getPageId());
-
- sd.messageIdIndex.clear(tx);
- sd.messageIdIndex.unload(tx);
- tx.free(sd.messageIdIndex.getPageId());
-
- if (sd.subscriptions != null) {
- sd.subscriptions.clear(tx);
- sd.subscriptions.unload(tx);
- tx.free(sd.subscriptions.getPageId());
-
- sd.subscriptionAcks.clear(tx);
- sd.subscriptionAcks.unload(tx);
- tx.free(sd.subscriptionAcks.getPageId());
- }
-
- String key = key(command.getDestination());
- storedDestinations.remove(key);
- dbstate.destinations.remove(tx, key);
- }
-
- private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
- StoredDestinationState sd = getStoredDestination(command.getDestination(), tx);
-
- // If set then we are creating it.. otherwise we are destroying the sub
- if (command.hasSubscriptionInfo()) {
- String subscriptionKey = command.getSubscriptionKey();
- sd.subscriptions.put(tx, subscriptionKey, command);
- long ackLocation=-1;
- if (!command.getRetroactive()) {
- ackLocation = sd.nextMessageId-1;
- }
-
- sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
- addAckLocation(sd, ackLocation, subscriptionKey);
- } else {
- // delete the sub...
- String subscriptionKey = command.getSubscriptionKey();
- sd.subscriptions.remove(tx, subscriptionKey);
- Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
- if( prev!=null ) {
- removeAckLocation(tx, sd, subscriptionKey, prev);
- }
+ ///////////////////////////////////////////////////////////////
+ // Map related methods.
+ ///////////////////////////////////////////////////////////////
+ public boolean mapAdd(AsciiBuffer map) {
+ return false;
}
-
- }
-
- /**
- * @param tx
- * @throws IOException
- */
- private void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
-
- LOG.debug("Checkpoint started.");
-
- dbstate.state = OPEN_STATE;
- dbstate.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
- tx.store(dbstate.page, dbstateMarshaller, true);
- pageFile.flush();
-
- if( cleanup ) {
-
- final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(journal.getFileMap().keySet());
-
- // Don't GC files under replication
- if( journalFilesBeingReplicated!=null ) {
- gcCandidateSet.removeAll(journalFilesBeingReplicated);
- }
-
- // Don't GC files after the first in progress tx
- Location firstTxLocation = dbstate.lastUpdate;
- if( dbstate.firstInProgressTransactionLocation!=null ) {
- firstTxLocation = dbstate.firstInProgressTransactionLocation;
- }
-
- if( firstTxLocation!=null ) {
- while( !gcCandidateSet.isEmpty() ) {
- Integer last = gcCandidateSet.last();
- if( last >= firstTxLocation.getDataFileId() ) {
- gcCandidateSet.remove(last);
- } else {
- break;
- }
- }
- }
-
- // Go through all the destinations to see if any of them can remove GC candidates.
- for (StoredDestinationState sd : storedDestinations.values()) {
- if( gcCandidateSet.isEmpty() ) {
- break;
- }
-
- // Use a visitor to cut down the number of pages that we load
- sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
- int last=-1;
- public boolean isInterestedInKeysBetween(Location first, Location second) {
- if( first==null ) {
- SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- } else if( second==null ) {
- SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- return !subset.isEmpty();
- } else {
- SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- }
- }
-
- public void visit(List<Location> keys, List<Long> values) {
- for (Location l : keys) {
- int fileId = l.getDataFileId();
- if( last != fileId ) {
- gcCandidateSet.remove(fileId);
- last = fileId;
- }
- }
- }
-
- });
- }
-
- if( !gcCandidateSet.isEmpty() ) {
- LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
- journal.removeDataFiles(gcCandidateSet);
- }
+ public boolean mapRemove(AsciiBuffer map) {
+ return false;
}
-
- LOG.debug("Checkpoint done.");
- }
-
- public HashSet<Integer> getJournalFilesBeingReplicated() {
- return journalFilesBeingReplicated;
- }
-
- // /////////////////////////////////////////////////////////////////
- // StoredDestination related implementation methods.
- // /////////////////////////////////////////////////////////////////
-
-
- private final HashMap<String, StoredDestinationState> storedDestinations = new HashMap<String, StoredDestinationState>();
-
- protected StoredDestinationState getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
- String key = key(destination);
- StoredDestinationState rc = storedDestinations.get(key);
- if (rc == null) {
- boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
- rc = loadStoredDestination(tx, key, topic);
- // Cache it. We may want to remove/unload destinations from the
- // cache that are not used for a while
- // to reduce memory usage.
- storedDestinations.put(key, rc);
+ public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
+ return null;
}
- return rc;
- }
-
- /**
- * @param tx
- * @param key
- * @param topic
- * @return
- * @throws IOException
- */
- private StoredDestinationState loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
- // Try to load the existing indexes..
- StoredDestinationState rc = dbstate.destinations.get(tx, key);
- if (rc == null) {
- // Brand new destination.. allocate indexes for it.
- rc = new StoredDestinationState();
- rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
- rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
- rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
-
- if (topic) {
- rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
- rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
- }
- dbstate.destinations.put(tx, key, rc);
+ public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
+ return null;
}
-
- // Configure the marshalers and load.
- rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- rc.orderIndex.setValueMarshaller(MessageKeys.MARSHALLER);
- rc.orderIndex.load(tx);
-
- // Figure out the next key using the last entry in the destination.
- Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
- if( lastEntry!=null ) {
- rc.nextMessageId = lastEntry.getKey()+1;
- }
-
- rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
- rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- rc.locationIndex.load(tx);
-
- rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- rc.messageIdIndex.load(tx);
-
- // If it was a topic...
- if (topic) {
-
- rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
- rc.subscriptions.load(tx);
-
- rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
- rc.subscriptionAcks.load(tx);
-
- rc.ackPositions = new TreeMap<Long, HashSet<String>>();
- rc.subscriptionCursors = new HashMap<String, Long>();
-
- for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
- Entry<String, Long> entry = iterator.next();
- addAckLocation(rc, entry.getValue(), entry.getKey());
- }
-
+ public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+ return null;
}
- return rc;
- }
-
- /**
- * @param sd
- * @param messageSequence
- * @param subscriptionKey
- */
- private void addAckLocation(StoredDestinationState sd, Long messageSequence, String subscriptionKey) {
- HashSet<String> hs = sd.ackPositions.get(messageSequence);
- if (hs == null) {
- hs = new HashSet<String>();
- sd.ackPositions.put(messageSequence, hs);
+ public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
+ return null;
+ }
+ public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
+ return null;
}
- hs.add(subscriptionKey);
- }
- /**
- * @param tx
- * @param sd
- * @param subscriptionKey
- * @param sequenceId
- * @throws IOException
- */
- private void removeAckLocation(Transaction tx, StoredDestinationState sd, String subscriptionKey, Long sequenceId) throws IOException {
- // Remove the sub from the previous location set..
- if (sequenceId != null) {
- HashSet<String> hs = sd.ackPositions.get(sequenceId);
- if (hs != null) {
- hs.remove(subscriptionKey);
- if (hs.isEmpty()) {
- HashSet<String> firstSet = sd.ackPositions.values().iterator().next();
- sd.ackPositions.remove(sequenceId);
-
- // Did we just empty out the first set in the
- // ordered list of ack locations? Then it's time to
- // delete some messages.
- if (hs == firstSet) {
-
- // Find all the entries that need to get deleted.
- ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
- for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
- Entry<Long, MessageKeys> entry = iterator.next();
- if (entry.getKey().compareTo(sequenceId) <= 0) {
- // We don't do the actually delete while we are
- // iterating the BTree since
- // iterating would fail.
- deletes.add(entry);
- }
- }
+ ///////////////////////////////////////////////////////////////
+ // Stream related methods.
+ ///////////////////////////////////////////////////////////////
+ public Long streamOpen() {
+ return null;
+ }
+ public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
+ }
+ public void streamClose(Long streamKey) throws KeyNotFoundException {
+ }
+ public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
+ return null;
+ }
+ public boolean streamRemove(Long streamKey) {
+ return false;
+ }
- // Do the actual deletes.
- for (Entry<Long, MessageKeys> entry : deletes) {
- sd.locationIndex.remove(tx, entry.getValue().location);
- sd.messageIdIndex.remove(tx,entry.getValue().messageId);
- sd.orderIndex.remove(tx,entry.getKey());
- }
- }
- }
- }
+ ///////////////////////////////////////////////////////////////
+ // Transaction related methods.
+ ///////////////////////////////////////////////////////////////
+ public void transactionAdd(Buffer txid) {
+ }
+ public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
+ }
+ public void transactionCommit(Buffer txid) throws KeyNotFoundException {
+ }
+ public Iterator<Buffer> transactionList(Buffer first, int max) {
+ return null;
+ }
+ public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
}
+ public void transactionRollback(Buffer txid) throws KeyNotFoundException {
+ }
+
}
-
- private String key(KahaDestination destination) {
- return destination.getType().getNumber() + ":" + destination.getName();
+
+ public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
+ KahaDBSession session = new KahaDBSession();
+ R rc = callback.execute(session);
+ return rc;
}
- // /////////////////////////////////////////////////////////////////
- // Transaction related implementation methods.
- // /////////////////////////////////////////////////////////////////
- protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
- protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-
- private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
- TransactionId key = key(info);
- ArrayList<Operation> tx = inflightTransactions.get(key);
- if (tx == null) {
- tx = new ArrayList<Operation>();
- inflightTransactions.put(key, tx);
- }
- return tx;
- }
-
- private TransactionId key(KahaTransactionInfo transactionInfo) {
- if (transactionInfo.hasLocalTransacitonId()) {
- KahaLocalTransactionId tx = transactionInfo.getLocalTransacitonId();
- LocalTransactionId rc = new LocalTransactionId();
- rc.setConnectionId(new ConnectionId(tx.getConnectionId()));
- rc.setValue(tx.getTransacitonId());
- return rc;
- } else {
- KahaXATransactionId tx = transactionInfo.getXaTransacitonId();
- XATransactionId rc = new XATransactionId();
- rc.setBranchQualifier(tx.getBranchQualifier().toByteArray());
- rc.setGlobalTransactionId(tx.getGlobalTransactionId().toByteArray());
- rc.setFormatId(tx.getFormatId());
- return rc;
- }
+ public void flush() {
}
+
+ ///////////////////////////////////////////////////////////////////
+ // IoC Properties.
+ ///////////////////////////////////////////////////////////////////
- private PageFile createPageFile() {
+ protected PageFile createPageFile() {
PageFile index = new PageFile(directory, "db");
index.setEnableWriteThread(isEnableIndexWriteAsync());
index.setWriteBatchSize(getIndexWriteBatchSize());
return index;
}
- private Journal createJournal() {
+ protected Journal createJournal() {
Journal manager = new Journal();
manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength());
@@ -1312,15 +855,15 @@
if (pageFile == null) {
pageFile = createPageFile();
}
- return pageFile;
- }
+ return pageFile;
+ }
- public Journal getJournal() {
+ public Journal getJournal() {
if (journal == null) {
journal = createJournal();
}
- return journal;
- }
+ return journal;
+ }
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
@@ -1330,118 +873,4 @@
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
- ///////////////////////////////////////////////////////////////////
- // Store interface
- ///////////////////////////////////////////////////////////////////
- class KahaDBSession implements Session {
-
- public void commit() {
- }
-
- ///////////////////////////////////////////////////////////////
- // Message related methods.
- ///////////////////////////////////////////////////////////////
- public Long messageAdd(MessageRecord message) {
- return null;
- }
- public Long messageGetKey(AsciiBuffer messageId) {
- return null;
- }
- public MessageRecord messageGetRecord(Long key) {
- return null;
- }
-
- ///////////////////////////////////////////////////////////////
- // Queue related methods.
- ///////////////////////////////////////////////////////////////
- public void queueAdd(AsciiBuffer queueName) {
- }
- public boolean queueRemove(AsciiBuffer queueName) {
- return false;
- }
- public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
- return null;
- }
- public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
- return null;
- }
- public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
- }
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
- return null;
- }
-
-
- ///////////////////////////////////////////////////////////////
- // Map related methods.
- ///////////////////////////////////////////////////////////////
- public boolean mapAdd(AsciiBuffer map) {
- return false;
- }
- public boolean mapRemove(AsciiBuffer map) {
- return false;
- }
- public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max) {
- return null;
- }
- public Buffer mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) throws KeyNotFoundException {
- return null;
- }
- public Buffer mapEntryGet(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
- return null;
- }
- public Buffer mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
- return null;
- }
- public Iterator<AsciiBuffer> mapEntryListKeys(AsciiBuffer map, AsciiBuffer first, int max) throws KeyNotFoundException {
- return null;
- }
-
- ///////////////////////////////////////////////////////////////
- // Stream related methods.
- ///////////////////////////////////////////////////////////////
- public Long streamOpen() {
- return null;
- }
- public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException {
- }
- public void streamClose(Long streamKey) throws KeyNotFoundException {
- }
- public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException {
- return null;
- }
- public boolean streamRemove(Long streamKey) {
- return false;
- }
-
- ///////////////////////////////////////////////////////////////
- // Transaction related methods.
- ///////////////////////////////////////////////////////////////
- public void transactionAdd(Buffer txid) {
- }
- public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException {
- }
- public void transactionCommit(Buffer txid) throws KeyNotFoundException {
- }
- public Iterator<Buffer> transactionList(Buffer first, int max) {
- return null;
- }
- public void transactionRemoveMessage(Buffer txid, AsciiBuffer queueName, Long messageKey) throws KeyNotFoundException {
- }
- public void transactionRollback(Buffer txid) throws KeyNotFoundException {
- }
-
- }
- public <R, T extends Exception> R execute(Callback<R, T> callback, Runnable onFlush) throws T {
- KahaDBSession session = new KahaDBSession();
- R rc = callback.execute(session);
- session.commit();
- if( onFlush!=null ) {
- onFlush.run();
- }
- return rc;
- }
-
- public void flush() {
- }
}
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=758450&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Wed Mar 25 22:11:15 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.util.Marshaller;
+
+public class Marshallers {
+
+ public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
+
+ public Class<Location> getType() {
+ return Location.class;
+ }
+
+ public Location readPayload(DataInput dataIn) throws IOException {
+ Location rc = new Location();
+ rc.setDataFileId(dataIn.readInt());
+ rc.setOffset(dataIn.readInt());
+ return rc;
+ }
+
+ public void writePayload(Location object, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(object.getDataFileId());
+ dataOut.writeInt(object.getOffset());
+ }
+ };
+
+
+ public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new Marshaller<AsciiBuffer>() {
+
+ public Class<AsciiBuffer> getType() {
+ return AsciiBuffer.class;
+ }
+
+ public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
+ byte data[] = new byte[dataIn.readShort()];
+ dataIn.readFully(data);
+ return new AsciiBuffer(data);
+ }
+
+ public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
+ dataOut.writeShort(object.length);
+ dataOut.write(object.data, object.offset, object.length);
+ }
+ };
+
+ public final static Marshaller<Buffer> BUFFER_MARSHALLER = new Marshaller<Buffer>() {
+
+ public Class<Buffer> getType() {
+ return Buffer.class;
+ }
+
+ public Buffer readPayload(DataInput dataIn) throws IOException {
+ byte data[] = new byte[dataIn.readShort()];
+ dataIn.readFully(data);
+ return new Buffer(data);
+ }
+
+ public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
+ dataOut.writeShort(object.length);
+ dataOut.write(object.data, object.offset, object.length);
+ }
+ };
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Wed Mar 25 22:11:15 2009
@@ -20,16 +20,17 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.util.Marshaller;
public class MessageKeys {
public static final MessageKeysMarshaller MARSHALLER = new MessageKeysMarshaller();
- final String messageId;
+ final AsciiBuffer messageId;
final Location location;
- public MessageKeys(String messageId, Location location) {
+ public MessageKeys(AsciiBuffer messageId, Location location) {
this.messageId=messageId;
this.location=location;
}
@@ -46,12 +47,14 @@
}
public MessageKeys readPayload(DataInput dataIn) throws IOException {
- return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+ byte data[] = new byte[dataIn.readShort()];
+ return new MessageKeys(new AsciiBuffer(data), Marshallers.LOCATION_MARSHALLER.readPayload(dataIn));
}
public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
- dataOut.writeUTF(object.messageId);
- LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+ dataOut.writeShort(object.messageId.length);
+ dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
+ Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
}
}
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java Wed Mar 25 22:11:15 2009
@@ -20,78 +20,76 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.Marshaller;
public class StoredDBState {
- protected final KahaDBStore store;
protected Page<StoredDBState> page;
protected int state;
- protected BTreeIndex<String, StoredDestinationState> destinations;
+ protected BTreeIndex<AsciiBuffer, StoredDestinationState> destinations;
protected Location lastUpdate;
- protected Location firstInProgressTransactionLocation;
-
- public StoredDBState(KahaDBStore store) {
- this.store = store;
- }
+ // We index the messages 3 ways: by sequence id, by journal location, and by message id.
+ long nextMessageId;
+ protected BTreeIndex<Long, MessageKeys> orderIndex;
+ protected BTreeIndex<Location, Long> locationIndex;
+ protected BTreeIndex<AsciiBuffer, Long> messageIdIndex;
- public void read(DataInput is) throws IOException {
- state = is.readInt();
- destinations = new BTreeIndex<String, StoredDestinationState>(store.pageFile, is.readLong());
- if (is.readBoolean()) {
- lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
- } else {
- lastUpdate = null;
- }
- if (is.readBoolean()) {
- firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
- } else {
- firstInProgressTransactionLocation = null;
- }
- }
public void write(DataOutput os) throws IOException {
- os.writeInt(state);
- os.writeLong(destinations.getPageId());
-
- if (lastUpdate != null) {
- os.writeBoolean(true);
- LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
- } else {
- os.writeBoolean(false);
- }
- if (firstInProgressTransactionLocation != null) {
- os.writeBoolean(true);
- LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
- } else {
- os.writeBoolean(false);
- }
}
+ public final static DBStateMarshaller MARSHALLER = new DBStateMarshaller();
static public class DBStateMarshaller implements Marshaller<StoredDBState> {
- private final KahaDBStore store;
-
- public DBStateMarshaller(KahaDBStore store) {
- this.store = store;
- }
-
public Class<StoredDBState> getType() {
return StoredDBState.class;
}
- public StoredDBState readPayload(DataInput dataIn) throws IOException {
- StoredDBState rc = new StoredDBState(this.store);
- rc.read(dataIn);
+ public StoredDBState readPayload(DataInput is) throws IOException {
+ StoredDBState rc = new StoredDBState();
+ rc.state = is.readInt();
+ rc.destinations = new BTreeIndex<AsciiBuffer, StoredDestinationState>(is.readLong());
+ if (is.readBoolean()) {
+ rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
+ } else {
+ rc.lastUpdate = null;
+ }
return rc;
}
- public void writePayload(StoredDBState object, DataOutput dataOut) throws IOException {
- object.write(dataOut);
+ public void writePayload(StoredDBState object, DataOutput os) throws IOException {
+ os.writeInt(object.state);
+ os.writeLong(object.destinations.getPageId());
+ if (object.lastUpdate != null) {
+ os.writeBoolean(true);
+ Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+ } else {
+ os.writeBoolean(false);
+ }
}
}
+
+ public void allocate(Transaction tx) throws IOException {
+ // First time this is created.. Initialize a new pagefile.
+ page = tx.allocate();
+ assert page.getPageId() == 0;
+ page.set(this);
+
+ state = KahaDBStore.CLOSED_STATE;
+ destinations = new BTreeIndex<AsciiBuffer, StoredDestinationState>(tx.getPageFile(), tx.allocate().getPageId());
+ tx.store(page, MARSHALLER, true);
+ }
+
+ public void load(Transaction tx) throws IOException {
+ destinations.setPageFile(tx.getPageFile());
+ destinations.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ destinations.setValueMarshaller(StoredDestinationState.MARSHALLER);
+ destinations.load(tx);
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java Wed Mar 25 22:11:15 2009
@@ -19,62 +19,51 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.TreeMap;
+import java.util.Map.Entry;
-import org.apache.activemq.broker.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
public class StoredDestinationState {
- long nextMessageId;
- BTreeIndex<Long, MessageKeys> orderIndex;
- BTreeIndex<Location, Long> locationIndex;
- BTreeIndex<String, Long> messageIdIndex;
-
- // These bits are only set for Topics
- BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
- BTreeIndex<String, Long> subscriptionAcks;
- HashMap<String, Long> subscriptionCursors;
- TreeMap<Long, HashSet<String>> ackPositions;
- public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
- private final KahaDBStore store;
+ long nextMessageId;
+ BTreeIndex<Long, Long> orderIndex;
- public StoredDestinationMarshaller(KahaDBStore store) {
- this.store = store;
+ public void allocate(Transaction tx) throws IOException {
+ orderIndex = new BTreeIndex<Long, Long>(tx.allocate());
+ }
+
+ public void load(Transaction tx) throws IOException {
+ orderIndex.setPageFile(tx.getPageFile());
+ orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ orderIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+ orderIndex.load(tx);
+
+ // Figure out the next key using the last entry in the destination.
+ Entry<Long, Long> lastEntry = orderIndex.getLast(tx);
+ if( lastEntry!=null ) {
+ nextMessageId = lastEntry.getKey()+1;
}
+ }
+ public final static StoredDestinationMarshaller MARSHALLER = new StoredDestinationMarshaller();
+ public static class StoredDestinationMarshaller implements Marshaller<StoredDestinationState> {
+
public Class<StoredDestinationState> getType() {
return StoredDestinationState.class;
}
public StoredDestinationState readPayload(DataInput dataIn) throws IOException {
StoredDestinationState value = new StoredDestinationState();
- value.orderIndex = new BTreeIndex<Long, MessageKeys>(store.pageFile, dataIn.readLong());
- value.locationIndex = new BTreeIndex<Location, Long>(store.pageFile, dataIn.readLong());
- value.messageIdIndex = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
-
- if (dataIn.readBoolean()) {
- value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(store.pageFile, dataIn.readLong());
- value.subscriptionAcks = new BTreeIndex<String, Long>(store.pageFile, dataIn.readLong());
- }
+ value.orderIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
return value;
}
public void writePayload(StoredDestinationState value, DataOutput dataOut) throws IOException {
dataOut.writeLong(value.orderIndex.getPageId());
- dataOut.writeLong(value.locationIndex.getPageId());
- dataOut.writeLong(value.messageIdIndex.getPageId());
- if (value.subscriptions != null) {
- dataOut.writeBoolean(true);
- dataOut.writeLong(value.subscriptions.getPageId());
- dataOut.writeLong(value.subscriptionAcks.getPageId());
- } else {
- dataOut.writeBoolean(false);
- }
}
}
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Wed Mar 25 22:11:15 2009
@@ -23,8 +23,6 @@
import java.util.TreeMap;
import org.apache.activemq.broker.store.Store;
-import org.apache.activemq.broker.store.Store.Session.KeyNotFoundException;
-import org.apache.activemq.broker.store.Store.Session.QueueRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.util.ByteArrayOutputStream;
Modified: activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/src/main/proto/kahadb-data.proto Wed Mar 25 22:11:15 2009
@@ -21,25 +21,32 @@
enum Type {
//| option java_create_message="true";
- MESSAGE_ADD = 1;
- QUEUE_ADD = 2;
- QUEUE_ADD_MESSAGE = 3;
- QUEUE_REMOVE_MESSAGE = 4;
- TRANSACTION_BEGIN = 5;
- TRANSACTION_ADD_MESSAGE = 6;
- TRANSACTION_REMOVE_MESSAGE = 7;
- TRANSACTION_COMMIT = 8;
- TRANSACTION_ROLLBACK = 9;
- MAP_ADD = 10;
- MAP_REMOVE = 11;
- MAP_ENTRY_PUT = 12;
- MAP_ENTRY_REMOVE = 13;
- STREAM_OPEN = 14;
- STREAM_WRITE = 15;
- STREAM_CLOSE = 16;
- STREAM_REMOVE = 17;
+ MESSAGE_ADD = 0;
+ QUEUE_ADD = 10;
+ QUEUE_REMOVE = 11;
+ QUEUE_ADD_MESSAGE = 12;
+ QUEUE_REMOVE_MESSAGE = 13;
+ TRANSACTION_BEGIN = 20;
+ TRANSACTION_ADD_MESSAGE = 21;
+ TRANSACTION_REMOVE_MESSAGE = 22;
+ TRANSACTION_COMMIT = 23;
+ TRANSACTION_ROLLBACK = 24;
+ MAP_ADD = 30;
+ MAP_REMOVE = 31;
+ MAP_ENTRY_PUT = 32;
+ MAP_ENTRY_REMOVE = 33;
+ STREAM_OPEN = 40;
+ STREAM_WRITE = 41;
+ STREAM_CLOSE = 42;
+ STREAM_REMOVE = 43;
+
+ TRACE = 100;
}
+message Trace {
+ optional bytes message = 2 [java_override_type = "AsciiBuffer"];
+}
+
///////////////////////////////////////////////////////////////
// Message related operations.
///////////////////////////////////////////////////////////////
Modified: activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=758450&r1=758449&r2=758450&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Wed Mar 25 22:11:15 2009
@@ -21,9 +21,9 @@
import junit.framework.TestCase;
+import org.apache.activemq.broker.store.Store.MessageRecord;
import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.broker.store.Store.VoidCallback;
-import org.apache.activemq.broker.store.Store.Session.MessageRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
public abstract class StoreTestBase extends TestCase {