You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/09/06 19:12:18 UTC
svn commit: r692685 - in
/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb:
index/BTreeIndex.java index/BTreeNode.java journal/Journal.java
page/Page.java store/KahaDBPersistenceAdaptor.java store/MessageDatabase.java
Author: chirino
Date: Sat Sep 6 10:12:18 2008
New Revision: 692685
URL: http://svn.apache.org/viewvc?rev=692685&view=rev
Log:
- You can now selectivly visit BTree nodes using the BTreeVisitor
- The message store now checkpoints and cleans up un-used journal files periodically.
- Removing the last item in a BTree node did not result in a page write, on restart, the last item would still be there
Modified:
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java Sat Sep 6 10:12:18 2008
@@ -225,6 +225,10 @@
synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key initialKey) throws IOException {
return root.iterator(tx, initialKey);
}
+
+ synchronized public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
+ root.visit(tx, visitor);
+ }
synchronized Value getFirst(Transaction tx) throws IOException {
return root.getFirst(tx);
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Sat Sep 6 10:12:18 2008
@@ -31,7 +31,6 @@
import org.apache.kahadb.index.BTreeIndex.Prefixer;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.PageOverflowIOException;
/**
@@ -39,7 +38,6 @@
* one Page of a PageFile.
*/
public final class BTreeNode<Key,Value> {
- private static final Log LOG = LogFactory.getLog(BTreeNode.class);
// The index that this node is part of.
private final BTreeIndex<Key,Value> index;
@@ -285,13 +283,10 @@
Value oldValue = values[idx];
setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
- if( keys.length!=0 ) {
- index.storeNode(tx, this, true);
+ if( keys.length==0 && parent!=null) {
+ tx.free(getPage());
} else {
- // If this leaf is empty and is not the root node..
- if( parent!=null ) {
- tx.free(getPage());
- }
+ index.storeNode(tx, this, true);
}
return oldValue;
@@ -505,6 +500,30 @@
}
}
+ public void visit(Transaction tx, BTreeVisitor<Key, Value> visitor) throws IOException {
+ if (visitor == null) {
+ throw new IllegalArgumentException("Visitor cannot be null");
+ }
+ if( isBranch() ) {
+ for(int i=0; i < this.children.length; i++) {
+ Key key1 = null;
+ if( i!=0 ) {
+ key1 = keys[i-1];
+ }
+ Key key2 = null;
+ if( i!=this.children.length-1 ) {
+ key1 = keys[i];
+ }
+ if( visitor.isInterestedInKeysBetween(key1, key2) ) {
+ BTreeNode<Key, Value> child = getChild(tx, i);
+ child.visit(tx, visitor);
+ }
+ }
+ } else {
+ visitor.visit(keys, values);
+ }
+ }
+
public Value getFirst(Transaction tx) throws IOException {
BTreeNode<Key, Value> node = this;
while( node .isBranch() ) {
@@ -716,6 +735,11 @@
public void setNext(long next) {
this.next = next;
}
+
+ @Override
+ public String toString() {
+ return "[BTreeNode "+(isBranch()?"branch":"leaf")+": "+Arrays.asList(keys)+"]";
+ }
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Sat Sep 6 10:12:18 2008
@@ -403,36 +403,21 @@
}
}
- public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Set<Integer> inProgress) throws IOException {
- Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
- unUsed.removeAll(inUse);
- unUsed.removeAll(inProgress);
-
- List<DataFile> purgeList = new ArrayList<DataFile>();
- for (Integer key : unUsed) {
- DataFile dataFile = fileMap.get(key);
- purgeList.add(dataFile);
- }
- for (DataFile dataFile : purgeList) {
- if (dataFile != dataFiles.getTail()) {
- forceRemoveDataFile(dataFile);
- }
- }
- }
-
public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, Integer lastFile) throws IOException {
Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
unUsed.removeAll(inUse);
- List<DataFile> purgeList = new ArrayList<DataFile>();
for (Integer key : unUsed) {
- // Only add files less than the lastFile..
- if (key.intValue() < lastFile.intValue()) {
- DataFile dataFile = fileMap.get(key);
- purgeList.add(dataFile);
+ // Don't remove files that come after the lastFile
+ if (lastFile !=null && key >= lastFile ) {
+ continue;
+ }
+ DataFile dataFile = fileMap.get(key);
+
+ // Can't remove the last file either.
+ if( dataFile == dataFiles.getTail() ) {
+ continue;
}
- }
- for (DataFile dataFile : purgeList) {
forceRemoveDataFile(dataFile);
}
}
@@ -489,7 +474,7 @@
}
public String toString() {
- return "DataManager:(" + filePrefix + ")";
+ return directory.toString();
}
public synchronized Location getMark() throws IllegalStateException {
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Sat Sep 6 10:12:18 2008
@@ -102,10 +102,6 @@
next = is.readLong();
}
- public String toString() {
- return "Page:" + getPageId();
- }
-
public long getPageId() {
return pageId;
}
@@ -130,5 +126,9 @@
return next;
}
+ public String toString() {
+ return "[Page:" + getPageId()+", type: "+type+"]";
+ }
+
}
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java Sat Sep 6 10:12:18 2008
@@ -79,24 +79,12 @@
public class KahaDBPersistenceAdaptor extends MessageDatabase implements PersistenceAdapter {
private WireFormat wireFormat = new OpenWireFormat();
- private AtomicBoolean started = new AtomicBoolean();
public void setBrokerName(String brokerName) {
}
public void setUsageManager(SystemUsage usageManager) {
}
- public void start() throws Exception {
- if ( started.compareAndSet(false,true) ) {
- load();
- }
- }
- public void stop() throws Exception {
- if ( started.compareAndSet(true,false) ) {
- unload();
- }
- }
-
public TransactionStore createTransactionStore() throws IOException {
return new TransactionStore(){
@@ -164,6 +152,7 @@
command.setMessage(ByteString.copyFrom(packet.getData(), packet.getOffset(), packet.getLength()));
store(command, message.isResponseRequired());
+
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=692685&r1=692684&r2=692685&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Sat Sep 6 10:12:18 2008
@@ -16,6 +16,8 @@
*/
package org.apache.kahadb.store;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
@@ -29,6 +31,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.TreeMap;
import java.util.Map.Entry;
@@ -42,11 +45,14 @@
import org.apache.kahadb.Marshaller;
import org.apache.kahadb.StringMarshaller;
import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeNode;
+import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.page.Transaction.PageOverflowIOException;
import org.apache.kahadb.store.data.KahaAddMessageCommand;
import org.apache.kahadb.store.data.KahaCommitCommand;
import org.apache.kahadb.store.data.KahaDestination;
@@ -63,35 +69,56 @@
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
public class MessageDatabase {
-
private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
-
+
protected class Metadata {
protected Page<Metadata> page;
protected int state;
protected BTreeIndex<String, StoredDestination> destinations;
protected Location lastUpdate;
protected Location firstInProgressTransactionLocation;
-
+
public void read(DataInput is) throws IOException {
state = is.readInt();
destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
- lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
- firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+ if (is.readBoolean()) {
+ lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ lastUpdate = null;
+ }
+ if (is.readBoolean()) {
+ firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ firstInProgressTransactionLocation = null;
+ }
}
public void write(DataOutput os) throws IOException {
os.writeInt(state);
os.writeLong(destinations.getPageId());
- LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
- LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+
+ if (lastUpdate != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
+ } else {
+ os.writeBoolean(false);
+ }
+
+ if (firstInProgressTransactionLocation != null) {
+ os.writeBoolean(true);
+ LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
+ } else {
+ os.writeBoolean(false);
+ }
}
}
@@ -111,25 +138,38 @@
}
}
-
protected PageFile pageFile;
- protected Journal asyncDataManager;
- protected Metadata metadata = new Metadata();
+ protected Journal asyncDataManager;
+ protected Metadata metadata = new Metadata();
- protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
+ protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
- protected boolean failIfJournalIsLocked;
+ protected boolean failIfJournalIsLocked;
- protected boolean deleteAllMessages;
- protected File directory;
+ protected boolean deleteAllMessages;
+ protected File directory;
protected boolean recovering;
- protected Thread checkpointThread;
+ protected Thread checkpointThread;
+ protected AtomicBoolean started = new AtomicBoolean();
public MessageDatabase() {
}
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ load();
+ }
+ }
+
+ public void stop() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ unload();
+ }
+ }
+
public void load() throws IOException {
+ recovering=true;
if (asyncDataManager == null) {
asyncDataManager = createAsyncDataManager();
}
@@ -160,61 +200,115 @@
store(new KahaTraceCommand().setMessage("DELETED " + new Date()));
- LOG.info("Journal deleted: ");
+ LOG.info("Persistence store purged.");
deleteAllMessages = false;
}
- pageFile.load();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- if (tx.getPageCount() == 0) {
- // First time this is created.. Initialize the metadata
- Page<Metadata> page = tx.allocate();
- assert page.getPageId() == 0;
- page.set(metadata);
- metadata.page = page;
- metadata.state = CLOSED_STATE;
- metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
- metadata.lastUpdate = new Location(0,0);
- metadata.firstInProgressTransactionLocation = new Location(0,0);
-
- tx.store(metadata.page, metadataMarshaller, true);
- } else {
- Page<Metadata> page = tx.load(0, metadataMarshaller);
- metadata = page.get();
- metadata.page = page;
- }
- metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
- metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
- metadata.destinations.load();
- }
- });
+ synchronized (indexMutex) {
+ pageFile.load();
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ if (tx.getPageCount() == 0) {
+ // First time this is created.. Initialize the metadata
+ Page<Metadata> page = tx.allocate();
+ assert page.getPageId() == 0;
+ page.set(metadata);
+ metadata.page = page;
+ metadata.state = CLOSED_STATE;
+ metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
+
+ tx.store(metadata.page, metadataMarshaller, true);
+
+ store(new KahaTraceCommand().setMessage("CREATED " + new Date()));
+ } else {
+ Page<Metadata> page = tx.load(0, metadataMarshaller);
+ metadata = page.get();
+ metadata.page = page;
+ }
+ metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
+ metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
+ metadata.destinations.load();
+ }
+ });
+
+ // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
+ // Perhaps we should just keep an index of file
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
+ Entry<String, StoredDestination> entry = iterator.next();
+ StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
+ storedDestinations.put(entry.getKey(), sd);
+ }
+ }
+ });
+
+ // Replay the the journal to get the indexes up to date with the
+ // latest
+ // updates.
+ recover();
+ }
+ recovering=false;
- // Replay the the journal to get the indexes up to date with the latest
- // updates.
- recover();
-
checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
public void run() {
- doCheckpoint();
+ try {
+ long start = System.currentTimeMillis();
+ while (started.get()) {
+ Thread.sleep(500);
+ long now = System.currentTimeMillis();
+ if( now - start >= 1000*1000 ) {
+ checkpoint();
+ start = now;
+ }
+ }
+ } catch (InterruptedException e) {
+ // Looks like someone really wants us to exit this thread...
+ }
}
};
+ checkpointThread.start();
}
-
- public void unload() throws IOException {
- metadata.destinations.unload();
- metadata.state = CLOSED_STATE;
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- tx.store(metadata.page, metadataMarshaller, true);
- }
- });
- metadata = new Metadata();
- pageFile.unload();
+
+ public void unload() throws IOException, InterruptedException {
+ checkpointThread.join();
+
+ synchronized (indexMutex) {
+
+ metadata.state = CLOSED_STATE;
+ metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ tx.store(metadata.page, metadataMarshaller, true);
+ }
+ });
+
+ metadata.destinations.unload();
+ pageFile.unload();
+ metadata = new Metadata();
+ }
+ store(new KahaTraceCommand().setMessage("CLEAN SHUTDOWN " + new Date()));
asyncDataManager.close();
}
+ /**
+ * @return
+ */
+ private Location getFirstInProgressTxLocation() {
+ Location l = null;
+ if (!inflightTransactions.isEmpty()) {
+ l = inflightTransactions.values().iterator().next().get(0).getLocation();
+ }
+ if (!preparedTransactions.isEmpty()) {
+ Location t = preparedTransactions.values().iterator().next().get(0).getLocation();
+ if (l==null || t.compareTo(l) <= 0) {
+ l = t;
+ }
+ }
+ return l;
+ }
/**
* Move all the messages that were in the journal into long term storage. We
@@ -226,37 +320,73 @@
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
+
+ long start = System.currentTimeMillis();
Location pos = null;
+
+ // We need to recover the transactions..
+ if (metadata.firstInProgressTransactionLocation != null) {
+ pos = metadata.firstInProgressTransactionLocation;
+ }
+
+ // Perhaps there were no transactions...
+ if( pos==null && metadata.lastUpdate!=null) {
+ // Start replay at the record after the last one recorded in the index file.
+ pos = asyncDataManager.getNextLocation(metadata.lastUpdate);
+ // No journal records need to be recovered.
+ if( pos == null ) {
+ return;
+ }
+ }
+
+ // Do we need to start from the begining?
+ if (pos == null) {
+ // This loads the first position.
+ pos = asyncDataManager.getNextLocation(null);
+ }
+
int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + asyncDataManager);
- long start = System.currentTimeMillis();
- // While we have records in the journal.
- while ((pos = asyncDataManager.getNextLocation(pos)) != null) {
+ LOG.info("Journal Recovery Started from: " + asyncDataManager + " at " + pos.getDataFileId() + ":" + pos.getOffset());
+
+ while (pos != null) {
JournalCommand message = load(pos);
process(message, pos);
redoCounter++;
+ pos = asyncDataManager.getNextLocation(pos);
}
+
Location location = store(new KahaTraceCommand().setMessage("RECOVERED " + new Date()), true);
- asyncDataManager.setMark(location, true);
long end = System.currentTimeMillis();
LOG.info("Replayed " + redoCounter + " operations from redo log in " + ((end - start) / 1000.0f) + " seconds.");
}
- private void doCheckpoint() {
+ private void checkpoint() {
+ try {
+ synchronized (indexMutex) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ checkpointUpdate(tx);
+ }
+ });
+ pageFile.checkpoint();
+ }
+ store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
+ } catch (IOException e) {
+ }
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Methods call by the broker to update and query the store.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
public Location store(JournalCommand data) throws IOException {
return store(data, false);
}
-
+
/**
- * All updated are are funneled through this method. The updates a converted to
- * a JournalMessage which is logged to the journal and then the data from
- * the JournalMessage is used to update the index just like it would be done durring
- * a recovery process.
+ * All updated are are funneled through this method. The updates a converted
+ * to a JournalMessage which is logged to the journal and then the data from
+ * the JournalMessage is used to update the index just like it would be done
+ * durring a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws IOException {
int size = data.serializedSize();
@@ -265,11 +395,12 @@
data.writeTo(os);
Location location = asyncDataManager.write(os.getByteSequence(), sync);
process(data, location);
- metadata.lastUpdate = location;
+ if( !recovering ) {
+ metadata.lastUpdate = location;
+ }
return location;
}
-
/**
* Loads a previously stored JournalMessage
*
@@ -285,13 +416,13 @@
message.mergeFrom(is);
return message;
}
-
- ///////////////////////////////////////////////////////////////////
+
+ // /////////////////////////////////////////////////////////////////
// Journaled record processing methods. Once the record is journaled,
// these methods handle applying the index updates. These may be called
// from the recovery method too so they need to be idempotent
- ///////////////////////////////////////////////////////////////////
-
+ // /////////////////////////////////////////////////////////////////
+
private void process(JournalCommand data, final Location location) throws IOException {
data.visit(new Visitor() {
@Override
@@ -318,12 +449,12 @@
public void visit(KahaRollbackCommand command) throws IOException {
process(command, location);
}
-
+
@Override
public void visit(KahaRemoveDestinationCommand command) throws IOException {
process(command, location);
}
-
+
@Override
public void visit(KahaSubscriptionCommand command) throws IOException {
process(command, location);
@@ -333,10 +464,12 @@
private void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new AddOpperation(command, location));
+ synchronized (indexMutex) {
+ ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+ inflightTx.add(new AddOpperation(command, location));
+ }
} else {
- synchronized(indexMutex) {
+ synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
@@ -348,10 +481,12 @@
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
- inflightTx.add(new RemoveOpperation(command, location));
+ synchronized (indexMutex) {
+ ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
+ inflightTx.add(new RemoveOpperation(command, location));
+ }
} else {
- synchronized(indexMutex) {
+ synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
@@ -361,9 +496,9 @@
}
}
-
+
protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
- synchronized(indexMutex) {
+ synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
@@ -371,9 +506,9 @@
});
}
}
-
+
protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
- synchronized(indexMutex) {
+ synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
@@ -381,19 +516,19 @@
});
}
}
-
+
protected void process(KahaCommitCommand command, Location location) throws IOException {
TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
- if (inflightTx == null) {
- inflightTx = preparedTransactions.remove(key);
- }
- if( inflightTx == null ) {
- return;
- }
-
- final ArrayList<Operation> messagingTx = inflightTx;
- synchronized(indexMutex) {
+ synchronized (indexMutex) {
+ ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+ if (inflightTx == null) {
+ inflightTx = preparedTransactions.remove(key);
+ }
+ if (inflightTx == null) {
+ return;
+ }
+
+ final ArrayList<Operation> messagingTx = inflightTx;
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
@@ -403,37 +538,42 @@
});
}
}
-
+
protected void process(KahaPrepareCommand command, Location location) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
- if (tx != null) {
- preparedTransactions.put(key, tx);
+ synchronized (indexMutex) {
+ TransactionId key = key(command.getTransactionInfo());
+ ArrayList<Operation> tx = inflightTransactions.remove(key);
+ if (tx != null) {
+ preparedTransactions.put(key, tx);
+ }
}
}
protected void process(KahaRollbackCommand command, Location location) {
- TransactionId key = key(command.getTransactionInfo());
- ArrayList<Operation> tx = inflightTransactions.remove(key);
- if (tx == null) {
- preparedTransactions.remove(key);
+ synchronized (indexMutex) {
+ TransactionId key = key(command.getTransactionInfo());
+ ArrayList<Operation> tx = inflightTransactions.remove(key);
+ if (tx == null) {
+ preparedTransactions.remove(key);
+ }
}
}
-
- ///////////////////////////////////////////////////////////////////
+
+ // /////////////////////////////////////////////////////////////////
// These methods do the actual index updates.
- ///////////////////////////////////////////////////////////////////
-
+ // /////////////////////////////////////////////////////////////////
+
protected final Object indexMutex = new Object();
-
+
private void upadateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
- // Skip adding the message to the index if this is a topic and there are no subscriptions.
- if( sd.subscriptions!=null && sd.ackLocations.isEmpty() ) {
+ // Skip adding the message to the index if this is a topic and there are
+ // no subscriptions.
+ if (sd.subscriptions != null && sd.ackLocations.isEmpty()) {
return;
}
-
+
// Add the message.
sd.orderIndex.put(tx, location, command.getMessageId());
sd.messageIdIndex.put(tx, command.getMessageId(), location);
@@ -441,32 +581,32 @@
private void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
- if( !command.hasSubscriptionKey() ) {
+ if (!command.hasSubscriptionKey()) {
// In the queue case we just remove the message from the index..
Location messageLocation = sd.messageIdIndex.remove(tx, command.getMessageId());
- if( messageLocation!=null ) {
+ if (messageLocation != null) {
sd.orderIndex.remove(tx, messageLocation);
}
} else {
- // In the topic case we need remove the message once it's been acked by all the subs
+ // In the topic case we need remove the message once it's been acked
+ // by all the subs
Location messageLocation = sd.messageIdIndex.get(tx, command.getMessageId());
-
+
// Make sure it's a valid message id...
- if( messageLocation!=null ) {
+ if (messageLocation != null) {
String subscriptionKey = command.getSubscriptionKey();
Location prev = sd.subscriptionAcks.put(tx, subscriptionKey, messageLocation);
-
+
// The following method handles deleting un-referenced messages.
removeAckLocation(tx, sd, subscriptionKey, prev);
-
+
// Add it to the new location set.
addAckLocation(sd, messageLocation, subscriptionKey);
}
-
+
}
- metadata.lastUpdate = ackLocation;
}
-
+
private void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
sd.orderIndex.clear(tx);
@@ -476,7 +616,7 @@
tx.free(sd.orderIndex.getPageId());
tx.free(sd.messageIdIndex.getPageId());
- if( sd.subscriptions!=null ) {
+ if (sd.subscriptions != null) {
sd.subscriptions.clear(tx);
sd.subscriptionAcks.clear(tx);
sd.subscriptions.unload();
@@ -484,26 +624,26 @@
tx.free(sd.subscriptions.getPageId());
tx.free(sd.subscriptionAcks.getPageId());
}
-
+
String key = key(command.getDestination());
storedDestinations.remove(key);
metadata.destinations.remove(tx, key);
}
-
+
private void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
-
+
// If set then we are creating it.. otherwise we are destroying the sub
- if( command.hasSubscriptionInfo() ) {
+ if (command.hasSubscriptionInfo()) {
String subscriptionKey = command.getSubscriptionKey();
sd.subscriptions.put(tx, subscriptionKey, command);
Location ackLocation;
- if( command.getRetroactive() ) {
- ackLocation = new Location(0,0);
+ if (command.getRetroactive()) {
+ ackLocation = new Location(0, 0);
} else {
ackLocation = location;
}
-
+
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
addAckLocation(sd, ackLocation, subscriptionKey);
} else {
@@ -515,12 +655,60 @@
}
}
-
+ /**
+ * @param tx
+ * @throws IOException
+ */
+ private void checkpointUpdate(Transaction tx) throws IOException {
+
+ // Find empty journal files to remove.
+ final HashSet<Integer> inUseFiles = new HashSet<Integer>();
+
+ for (StoredDestination sd : storedDestinations.values()) {
+ // Use a visitor to cut down the number of pages that we load
+ sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
+ int last=-1;
+ public boolean isInterestedInKeysBetween(Location first, Location second) {
+ if( second!=null ) {
+ if( last+1 == second.getDataFileId() ) {
+ last++;
+ inUseFiles.add(last);
+ }
+ if( last == second.getDataFileId() ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void visit(Location[] keys, String[] values) {
+ for (int i = 0; i < keys.length; i++) {
+ if( last == keys[i].getDataFileId() ) {
+ inUseFiles.add(keys[i].getDataFileId());
+ last = keys[i].getDataFileId();
+ }
+ }
+
+ }
+ });
+ }
+
+ metadata.state = OPEN_STATE;
+ metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
+ tx.store(metadata.page, metadataMarshaller, true);
+
+ Location l = metadata.lastUpdate;
+ if( metadata.firstInProgressTransactionLocation!=null ) {
+ l = metadata.firstInProgressTransactionLocation;
+ }
+ asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
+ }
+
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// StoredDestination related implementation methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
private final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
@@ -541,7 +729,7 @@
HashMap<String, Location> subscriptionCursors;
TreeMap<Location, HashSet<String>> ackLocations;
}
-
+
protected class StoredDestinationMarshaller implements Marshaller<StoredDestination> {
public Class<StoredDestination> getType() {
return StoredDestination.class;
@@ -551,8 +739,8 @@
StoredDestination value = new StoredDestination();
value.orderIndex = new BTreeIndex<Location, String>(pageFile, dataIn.readLong());
value.messageIdIndex = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
-
- if( dataIn.readBoolean() ) {
+
+ if (dataIn.readBoolean()) {
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
value.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, dataIn.readLong());
}
@@ -562,7 +750,7 @@
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
dataOut.writeLong(value.orderIndex.getPageId());
dataOut.writeLong(value.messageIdIndex.getPageId());
- if( value.subscriptions !=null ) {
+ if (value.subscriptions != null) {
dataOut.writeBoolean(true);
dataOut.writeLong(value.subscriptions.getPageId());
dataOut.writeLong(value.subscriptionAcks.getPageId());
@@ -571,10 +759,10 @@
}
}
}
-
+
static class LocationMarshaller implements Marshaller<Location> {
- final static LocationMarshaller INSTANCE = new LocationMarshaller();
-
+ final static LocationMarshaller INSTANCE = new LocationMarshaller();
+
public Class<Location> getType() {
return Location.class;
}
@@ -591,22 +779,26 @@
dataOut.writeInt(object.getOffset());
}
}
-
+
static class KahaSubscriptionCommandMarshaller implements Marshaller<KahaSubscriptionCommand> {
- final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
-
+ final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
+
public Class<KahaSubscriptionCommand> getType() {
return KahaSubscriptionCommand.class;
}
public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
- rc.mergeFrom((InputStream)dataIn);
+ byte[] ba = new byte[dataIn.readShort()];
+ dataIn.readFully(ba);
+ rc.mergeFrom(ba);
return rc;
}
public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
- object.writeTo((OutputStream)dataOut);
+ byte[] ba = object.toByteArray();
+ dataOut.writeShort(ba.length);
+ dataOut.write(ba);
}
}
@@ -614,57 +806,71 @@
String key = key(destination);
StoredDestination rc = storedDestinations.get(key);
if (rc == null) {
- // Try to load the existing indexes..
- rc = metadata.destinations.get(tx, key);
- if( rc ==null ) {
- // Brand new destination.. allocate indexes for it.
- rc = new StoredDestination();
- rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
- rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
-
- if( destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC ) {
- rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
- rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
- }
+ boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
+ rc = loadStoredDestination(tx, key, topic);
+ // Cache it. We may want to remove/unload destinations from the
+ // cache that are not used for a while
+ // to reduce memory usage.
+ storedDestinations.put(key, rc);
+ }
+ return rc;
+ }
+
+ /**
+ * @param tx
+ * @param key
+ * @param topic
+ * @return
+ * @throws IOException
+ */
+ private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
+ // Try to load the existing indexes..
+ StoredDestination rc = metadata.destinations.get(tx, key);
+ if (rc == null) {
+ // Brand new destination.. allocate indexes for it.
+ rc = new StoredDestination();
+ rc.orderIndex = new BTreeIndex<Location, String>(pageFile, tx.allocate());
+ rc.messageIdIndex = new BTreeIndex<String, Location>(pageFile, tx.allocate());
+
+ if (topic) {
+ rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
+ rc.subscriptionAcks = new BTreeIndex<String, Location>(pageFile, tx.allocate());
}
-
- // Configure the marshalers and load.
- rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
- rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
- rc.orderIndex.load();
-
- rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
- rc.messageIdIndex.load();
-
- // If it was a topic...
- if( rc.subscriptions!=null ) {
-
- rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
- rc.subscriptions.load();
-
- rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
- rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
- rc.subscriptionAcks.load();
-
- rc.ackLocations = new TreeMap<Location, HashSet<String>>();
- rc.subscriptionCursors = new HashMap<String, Location>();
-
- for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
- Entry<String,Location> entry = iterator.next();
- addAckLocation(rc, entry.getValue(), entry.getKey());
- }
+ metadata.destinations.put(tx, key, rc);
+ }
+
+ // Configure the marshalers and load.
+ rc.orderIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
+ rc.orderIndex.setValueMarshaller(StringMarshaller.INSTANCE);
+ rc.orderIndex.load();
+
+ rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
+ rc.messageIdIndex.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.messageIdIndex.load();
+
+ // If it was a topic...
+ if (topic) {
+
+ rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
+ rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
+ rc.subscriptions.load();
+
+ rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
+ rc.subscriptionAcks.setValueMarshaller(LocationMarshaller.INSTANCE);
+ rc.subscriptionAcks.load();
+ rc.ackLocations = new TreeMap<Location, HashSet<String>>();
+ rc.subscriptionCursors = new HashMap<String, Location>();
+
+ for (Iterator<Entry<String, Location>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
+ Entry<String, Location> entry = iterator.next();
+ addAckLocation(rc, entry.getValue(), entry.getKey());
}
-
- // Cache it. We may want to remove/unload destinations from the cache that are not used for a while
- // to reduce memory usage.
- storedDestinations.put(key, rc);
+
}
return rc;
}
-
+
/**
* @param sd
* @param messageLocation
@@ -672,14 +878,13 @@
*/
private void addAckLocation(StoredDestination sd, Location messageLocation, String subscriptionKey) {
HashSet<String> hs = sd.ackLocations.get(messageLocation);
- if( hs == null ) {
+ if (hs == null) {
hs = new HashSet<String>();
sd.ackLocations.put(messageLocation, hs);
}
hs.add(subscriptionKey);
}
-
-
+
/**
* @param tx
* @param sd
@@ -689,37 +894,37 @@
*/
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Location location) throws IOException {
// Remove the sub from the previous location set..
- if( location!=null ) {
+ if (location != null) {
HashSet<String> hs = sd.ackLocations.get(location);
- if(hs!=null) {
+ if (hs != null) {
hs.remove(subscriptionKey);
- if( hs.isEmpty() ) {
+ if (hs.isEmpty()) {
HashSet<String> firstSet = sd.ackLocations.values().iterator().next();
sd.ackLocations.remove(location);
-
- // Did we just empty out the first set in the
- // ordered list of ack locations? Then it's time to
+
+ // Did we just empty out the first set in the
+ // ordered list of ack locations? Then it's time to
// delete some messages.
- if( hs==firstSet ) {
+ if (hs == firstSet) {
-
// Find all the entries that need to get deleted.
ArrayList<Entry<Location, String>> deletes = new ArrayList<Entry<Location, String>>();
for (Iterator<Entry<Location, String>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
Entry<Location, String> entry = iterator.next();
- while( entry.getKey().compareTo(location) <= 0 ) {
- // We don't do the actually delete while we are iterating the BTree since
+ while (entry.getKey().compareTo(location) <= 0) {
+ // We don't do the actually delete while we are
+ // iterating the BTree since
// iterating would fail.
deletes.add(entry);
}
}
-
+
// Do the actual deletes.
for (Entry<Location, String> entry : deletes) {
sd.messageIdIndex.remove(tx, entry.getValue());
sd.orderIndex.remove(tx, entry.getKey());
}
-
+
}
}
}
@@ -727,15 +932,14 @@
}
private String key(KahaDestination destination) {
- return destination.getType().getNumber()+":"+destination.getName();
+ return destination.getType().getNumber() + ":" + destination.getName();
}
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
// Transaction related implementation methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-
private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) {
TransactionId key = key(info);
@@ -766,15 +970,21 @@
abstract class Operation {
final Location location;
+
public Operation(Location location) {
this.location = location;
}
+
+ public Location getLocation() {
+ return location;
+ }
+
abstract public void execute(Transaction tx) throws IOException;
}
-
+
class AddOpperation extends Operation {
final KahaAddMessageCommand command;
-
+
public AddOpperation(KahaAddMessageCommand command, Location location) {
super(location);
this.command = command;
@@ -783,15 +993,15 @@
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
-
+
public KahaAddMessageCommand getCommand() {
return command;
}
}
-
+
class RemoveOpperation extends Operation {
final KahaRemoveMessageCommand command;
-
+
public RemoveOpperation(KahaRemoveMessageCommand command, Location location) {
super(location);
this.command = command;
@@ -805,13 +1015,11 @@
return command;
}
}
-
-
- ///////////////////////////////////////////////////////////////////
+
+ // /////////////////////////////////////////////////////////////////
// Initialization related implementation methods.
- ///////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////////////////////////
-
private PageFile createPageFile() {
PageFile pf = new PageFile(directory, "database");
return pf;
@@ -841,5 +1049,4 @@
this.deleteAllMessages = deleteAllMessages;
}
-
}