You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/17 22:00:10 UTC
svn commit: r785769 - in /activemq/sandbox/activemq-flow:
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-store/src/test/java/org/apache/activemq/broker/store/
Author: cmacnaug
Date: Wed Jun 17 20:00:09 2009
New Revision: 785769
URL: http://svn.apache.org/viewvc?rev=785769&view=rev
Log:
Fixing up checkPoint cleanup not preserve in use data files
Modified:
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Wed Jun 17 20:00:09 2009
@@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
+import java.util.TreeSet;
import java.util.Map.Entry;
import org.apache.activemq.broker.store.Store;
@@ -33,6 +34,7 @@
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
import org.apache.activemq.queue.QueueDescriptor;
import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.LongMarshaller;
@@ -287,5 +289,4 @@
this.size = size;
}
}
-
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Jun 17 20:00:09 2009
@@ -193,6 +193,17 @@
}
}
}
+
+ if (deleteAllMessages) {
+ getJournal().start();
+ journal.delete();
+ journal.close();
+ journal = null;
+ getPageFile().delete();
+ rootEntity = new RootEntity();
+ LOG.info("Persistence store purged.");
+ deleteAllMessages = false;
+ }
getJournal().start();
@@ -236,18 +247,7 @@
try {
open();
- if (deleteAllMessages) {
- journal.delete();
-
- pageFile.unload();
- pageFile.delete();
- rootEntity = new RootEntity();
-
- LOG.info("Persistence store purged.");
- deleteAllMessages = false;
-
- loadPageFile();
- }
+
store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
} finally {
indexLock.writeLock().unlock();
@@ -439,8 +439,9 @@
protected void checkpointCleanup(final boolean cleanup) {
try {
- long start = System.currentTimeMillis();
indexLock.writeLock().lock();
+ long start = System.currentTimeMillis();
+
try {
if (!opened.get()) {
return;
@@ -455,7 +456,11 @@
}
long end = System.currentTimeMillis();
if (end - start > 1000) {
- LOG.warn("KahaDB Cleanup took " + (end - start));
+ if (cleanup) {
+ LOG.warn("KahaDB Cleanup took " + (end - start));
+ } else {
+ LOG.warn("KahaDB CheckPoint took " + (end - start));
+ }
}
} catch (IOException e) {
e.printStackTrace();
@@ -503,77 +508,7 @@
gcCandidateSet.removeAll(journalFilesBeingReplicated);
}
- // Don't GC files after the first in progress tx
- Location firstTxLocation = rootEntity.getLastUpdate();
-
- if (firstTxLocation != null) {
- while (!gcCandidateSet.isEmpty()) {
- Integer last = gcCandidateSet.last();
- if (last >= firstTxLocation.getDataFileId()) {
- gcCandidateSet.remove(last);
- } else {
- break;
- }
- }
- }
-
- // // Go through all the destinations to see if any of them can
- // remove GC candidates.
- // for (StoredDestinationState sd : storedDestinations.values()) {
- // if( gcCandidateSet.isEmpty() ) {
- // break;
- // }
- //
- // // Use a visitor to cut down the number of pages that we load
- // dbstate.locationIndex.visit(tx, new BTreeVisitor<Location,
- // Long>() {
- // int last=-1;
- // public boolean isInterestedInKeysBetween(Location first, Location
- // second) {
- // if( first==null ) {
- // SortedSet<Integer> subset =
- // gcCandidateSet.headSet(second.getDataFileId()+1);
- // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
- // ) {
- // subset.remove(second.getDataFileId());
- // }
- // return !subset.isEmpty();
- // } else if( second==null ) {
- // SortedSet<Integer> subset =
- // gcCandidateSet.tailSet(first.getDataFileId());
- // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
- // ) {
- // subset.remove(first.getDataFileId());
- // }
- // return !subset.isEmpty();
- // } else {
- // SortedSet<Integer> subset =
- // gcCandidateSet.subSet(first.getDataFileId(),
- // second.getDataFileId()+1);
- // if( !subset.isEmpty() && subset.first() == first.getDataFileId()
- // ) {
- // subset.remove(first.getDataFileId());
- // }
- // if( !subset.isEmpty() && subset.last() == second.getDataFileId()
- // ) {
- // subset.remove(second.getDataFileId());
- // }
- // return !subset.isEmpty();
- // }
- // }
- //
- // public void visit(List<Location> keys, List<Long> values) {
- // for (Location l : keys) {
- // int fileId = l.getDataFileId();
- // if( last != fileId ) {
- // gcCandidateSet.remove(fileId);
- // last = fileId;
- // }
- // }
- // }
- //
- // });
- // }
+ rootEntity.removeGCCandidates(gcCandidateSet, tx);
if (!gcCandidateSet.isEmpty()) {
if (LOG.isErrorEnabled()) {
@@ -646,7 +581,7 @@
LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
}
return location;
-
+
} finally {
if (tx == null)
indexLock.writeLock().unlock();
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Wed Jun 17 20:00:09 2009
@@ -22,7 +22,10 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
+import java.util.SortedSet;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.Map.Entry;
import org.apache.activemq.broker.store.Store;
@@ -32,22 +35,28 @@
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.queue.QueueDescriptor;
import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.IntegerMarshaller;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller;
public class RootEntity {
+ //TODO remove this one performance testing is complete.
+ private static final boolean USE_LOC_INDEX = true;
+
public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>() {
public RootEntity readPayload(DataInput is) throws IOException {
RootEntity rc = new RootEntity();
rc.state = is.readInt();
rc.maxMessageKey = is.readLong();
rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
- // rc.locationIndex = new BTreeIndex<Location, Long>(is.readLong());
+ if (USE_LOC_INDEX)
+ rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
if (is.readBoolean()) {
@@ -62,7 +71,8 @@
os.writeInt(object.state);
os.writeLong(object.maxMessageKey);
os.writeLong(object.messageKeyIndex.getPageId());
- // os.writeLong(object.locationIndex.getPageId());
+ if (USE_LOC_INDEX)
+ os.writeLong(object.locationIndex.getPageId());
os.writeLong(object.destinationIndex.getPageId());
os.writeLong(object.messageRefsIndex.getPageId());
if (object.lastUpdate != null) {
@@ -86,7 +96,7 @@
// Message Indexes
private long maxMessageKey;
private BTreeIndex<Long, Location> messageKeyIndex;
- // private BTreeIndex<Location, Long> locationIndex;
+ private BTreeIndex<Integer, Long> locationIndex;
private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
// count:
@@ -107,8 +117,8 @@
state = KahaDBStore.CLOSED_STATE;
messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
- // locationIndex = new BTreeIndex<Location, Long>(tx.getPageFile(),
- // tx.allocate().getPageId());
+ if (USE_LOC_INDEX)
+ locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
@@ -128,10 +138,13 @@
maxMessageKey = last.getKey();
}
}
- // locationIndex.setPageFile(tx.getPageFile());
- // locationIndex.setKeyMarshaller(Marshallers.LOCATION_MARSHALLER);
- // locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- // locationIndex.load(tx);
+
+ if (USE_LOC_INDEX) {
+ locationIndex.setPageFile(tx.getPageFile());
+ locationIndex.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+ locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
+ locationIndex.load(tx);
+ }
destinationIndex.setPageFile(tx.getPageFile());
destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
@@ -245,15 +258,31 @@
// Message existed.. undo the index update we just did. Chances
// are it's a transaction replay.
messageKeyIndex.put(tx, id, previous);
+ } else {
+ if (USE_LOC_INDEX) {
+ Long refs = locationIndex.get(tx, location.getDataFileId());
+ if (refs == null) {
+ locationIndex.put(tx, location.getDataFileId(), new Long(1));
+ } else {
+ locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() + 1));
+ }
+ }
}
}
public void messageRemove(Transaction tx, Long messageKey) throws IOException {
// Location location = messageKeyIndex.remove(tx, messageKey);
- messageKeyIndex.remove(tx, messageKey);
- // if (location != null) {
- // locationIndex.remove(tx, location);
- // }
+ Location location = messageKeyIndex.remove(tx, messageKey);
+ if (USE_LOC_INDEX && location != null) {
+ Long refs = locationIndex.get(tx, location.getDataFileId());
+ if (refs != null) {
+ if (refs.longValue() <= 1) {
+ locationIndex.remove(tx, location.getDataFileId());
+ } else {
+ locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() - 1));
+ }
+ }
+ }
}
public Location messageGetLocation(Transaction tx, Long messageKey) {
@@ -436,34 +465,137 @@
//TODO check that none of the locations specified by the indexes
//are past the last update location in the journal. This can happen
//if the index is flushed before the journal.
- //
- //Collection<DestinationEntity> values = destinations.values();
- //for (DestinationEntity de : values) {
- // count +=
- //}
- // Go through all the destinations to see if they have messages past
- // the lastAppendLocation
- //for (StoredDestinationState sd :
- //
- // final ArrayList<Long> matches = new ArrayList<Long>();
- // // Find all the Locations that are >= than the last Append Location.
- // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
- // Long>(lastAppendLocation) {
- // @Override
- // protected void matched(Location key, Long value) {
- // matches.add(value);
- // }
- // });
- //
- //
- // for (Long sequenceId : matches) {
- // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- // sd.locationIndex.remove(tx, keys.location);
- // sd.messageIdIndex.remove(tx, keys.messageId);
- // undoCounter++;
- // // TODO: do we need to modify the ack positions for the pub sub case?
- // }
- // }
- return 0;
+ int count = 0;
+
+ //TODO: It might be better to tie the the index update to the journal write
+ //so that we can be sure that all journal entries are on disk prior to
+ //index update.
+
+ //Scan MessageKey Index to find message keys past the last append
+ //location:
+// final ArrayList<Long> matches = new ArrayList<Long>();
+// messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+//
+// @Override
+// protected void matched(Location key, Long value) {
+// matches.add(value);
+// }
+// });
+
+
+// for (Long sequenceId : matches) {
+// MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+// sd.locationIndex.remove(tx, keys.location);
+// sd.messageIdIndex.remove(tx, keys.messageId);
+// count++;
+// }
+
+ // @Override
+ // protected void matched(Location key, Long value) {
+ // matches.add(value);
+ // }
+ // });
+ //
+ //
+ // for (Long sequenceId : matches) {
+ // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ // sd.locationIndex.remove(tx, keys.location);
+ // sd.messageIdIndex.remove(tx, keys.messageId);
+ // undoCounter++;
+ // })
+
+ // for (DestinationEntity de : destinations.values()) {
+ // final ArrayList<Long> matches = new ArrayList<Long>();
+ // // Find all the Locations that are >= than the last Append Location.
+ // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
+ // Long>(lastAppendLocation) {
+ // @Override
+ // protected void matched(Location key, Long value) {
+ // matches.add(value);
+ // }
+ // });
+ //
+ //
+ // for (Long sequenceId : matches) {
+ // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ // sd.locationIndex.remove(tx, keys.location);
+ // sd.messageIdIndex.remove(tx, keys.messageId);
+ // undoCounter++;
+ // }
+ // }
+ return count;
+ }
+
+ /**
+ * Go through indexes checking to
+ *
+ * @param gcCandidateSet
+ * @throws IOException
+ */
+ final void removeGCCandidates(final TreeSet<Integer> gcCandidateSet, Transaction tx) throws IOException {
+
+ // Don't GC files after the first in progress tx
+ Location firstTxLocation = lastUpdate;
+
+ if (firstTxLocation != null) {
+ while (!gcCandidateSet.isEmpty()) {
+ Integer last = gcCandidateSet.last();
+ if (last >= firstTxLocation.getDataFileId()) {
+ gcCandidateSet.remove(last);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (gcCandidateSet.isEmpty()) {
+ return;
+ }
+
+ if (!USE_LOC_INDEX) {
+ return;
+ }
+
+ // Go through the location index to see if we can remove gc candidates:
+ // Use a visitor to cut down the number of pages that we load
+ locationIndex.visit(tx, new BTreeVisitor<Integer, Long>() {
+ int last = -1;
+
+ public boolean isInterestedInKeysBetween(Integer first, Integer second) {
+ if (first == null) {
+ SortedSet<Integer> subset = gcCandidateSet.headSet(second + 1);
+ if (!subset.isEmpty() && subset.last().equals(second)) {
+ subset.remove(second);
+ }
+ return !subset.isEmpty();
+ } else if (second == null) {
+ SortedSet<Integer> subset = gcCandidateSet.tailSet(first);
+ if (!subset.isEmpty() && subset.first().equals(first)) {
+ subset.remove(first);
+ }
+ return !subset.isEmpty();
+ } else {
+ SortedSet<Integer> subset = gcCandidateSet.subSet(first, second + 1);
+ if (!subset.isEmpty() && subset.first().equals(first)) {
+ subset.remove(first);
+ }
+ if (!subset.isEmpty() && subset.last().equals(second)) {
+ subset.remove(second);
+ }
+ return !subset.isEmpty();
+ }
+ }
+
+ public void visit(List<Integer> keys, List<Long> values) {
+ for (Integer l : keys) {
+ int fileId = l;
+ if (last != fileId) {
+ gcCandidateSet.remove(fileId);
+ last = fileId;
+ }
+ }
+ }
+ });
+
}
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=785769&r1=785768&r2=785769&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Wed Jun 17 20:00:09 2009
@@ -38,8 +38,8 @@
import org.apache.activemq.queue.QueueDescriptor;
public abstract class StorePerformanceBase extends TestCase {
-
- private static int PERFORMANCE_SAMPLES = 5;
+
+ private static int PERFORMANCE_SAMPLES = 50;
private static boolean SYNC_TO_DISK = true;
private static final boolean USE_SHARED_WRITER = true;
@@ -56,20 +56,24 @@
abstract protected Store createStore();
private SharedWriter writer = null;
- private Semaphore writePermits = null;
+
+ private Semaphore enqueuePermits;
+ private Semaphore dequeuePermits;
@Override
protected void setUp() throws Exception {
store = createStore();
+ //store.setDeleteAllMessages(false);
store.start();
if (USE_SHARED_WRITER) {
writer = new SharedWriter();
writer.start();
}
-
- writePermits = new Semaphore(1000);
-
+
+ enqueuePermits = new Semaphore(20000000);
+ dequeuePermits = new Semaphore(0);
+
queueId = new QueueDescriptor();
queueId.setQueueName(new AsciiBuffer("test"));
store.execute(new VoidCallback<Exception>() {
@@ -78,6 +82,20 @@
session.queueAdd(queueId);
}
}, null);
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<Store.QueueQueryResult> qqrs = session.queueList(queueId, 1);
+ assertTrue(qqrs.hasNext());
+ Store.QueueQueryResult qqr = qqrs.next();
+ if(qqr.getSize() > 0)
+ {
+ queueKey.set(qqr.getLastSequence() + 1);
+ System.out.println("Recovered queue: " + qqr.getDescriptor().getQueueName() + " with " + qqr.getCount() + " messages");
+ }
+ }
+ }, null);
}
@Override
@@ -100,8 +118,6 @@
}
}
- private final Object wakeupMutex = new Object();
-
class SharedWriter implements Runnable {
LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
private Thread thread;
@@ -200,8 +216,8 @@
public void stop() throws InterruptedException {
stopped.set(true);
- while (writePermits.hasQueuedThreads()) {
- writePermits.release();
+ while (enqueuePermits.hasQueuedThreads()) {
+ enqueuePermits.release();
}
thread.join();
}
@@ -211,7 +227,7 @@
Buffer buffer = new Buffer(new byte[1024]);
for (long i = 0; !stopped.get(); i++) {
- writePermits.acquireUninterruptibly();
+ enqueuePermits.acquire();
final MessageRecord messageRecord = new MessageRecord();
messageRecord.setKey(store.allocateStoreTracking());
@@ -223,10 +239,6 @@
SharedQueueOp op = new SharedQueueOp() {
public void run() {
rate.increment();
- writePermits.release();
- synchronized (wakeupMutex) {
- wakeupMutex.notify();
- }
}
};
@@ -239,6 +251,7 @@
queueRecord.setQueueKey(queueKey.incrementAndGet());
queueRecord.setSize(messageRecord.getSize());
session.queueAddMessage(queueId, queueRecord);
+ dequeuePermits.release();
}
};
@@ -299,6 +312,7 @@
SharedQueueOp op = new SharedQueueOp() {
public void run() {
rate.increment(records.size());
+ enqueuePermits.release(records.size());
queryWait.release();
}
};
@@ -324,15 +338,7 @@
writer.addOp(op);
}
- //queryWait.acquireUninterruptibly();
- if (records.isEmpty()) {
- // synchronized (wakeupMutex) {
- // try {
- // wakeupMutex.wait(500);
- // } catch (InterruptedException e) {
- // }
- // }
- }
+ dequeuePermits.acquire();
records.clear();
}
} catch (InterruptedException e) {
@@ -346,6 +352,12 @@
}
}
+ public void test1_1_0() throws Exception {
+ startProducers(1);
+ reportRates();
+ }
+
+
public void test1_1_1() throws Exception {
startProducers(1);
startConsumers(1);