You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/16 19:42:21 UTC
svn commit: r785317 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/test/java/org/apache/activemq/broker/
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ ac...
Author: cmacnaug
Date: Tue Jun 16 17:42:21 2009
New Revision: 785317
URL: http://svn.apache.org/viewvc?rev=785317&view=rev
Log:
Updating KahaDBStore to apply journal and index updates on the fly instead of at commit time.
This allows larger chunks of work to be done within a single transaction reducing the need for lots of locking unlocking. Also added locking and transaction controls to the Session interface to that callers can directly operate against a Session to avoid creating one per transaction.
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/BrokerDatabase.java Tue Jun 16 17:42:21 2009
@@ -36,7 +36,6 @@
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
-import org.apache.activemq.broker.store.Store.VoidCallback;
import org.apache.activemq.dispatch.DispatcherAware;
import org.apache.activemq.dispatch.IDispatcher;
import org.apache.activemq.flow.AbstractLimitedFlowResource;
@@ -117,7 +116,7 @@
}, databaseFlow, storeLimiter, opQueue);
storeController.useOverFlowQueue(false);
super.onFlowOpened(storeController);
-
+
flushDelayCallback = new Runnable() {
public void run() {
flushDelayCallback();
@@ -171,11 +170,9 @@
}
public Iterator<QueueQueryResult> listQueues(final short type) throws Exception {
- // TODO Auto-generated method stub
return store.execute(new Callback<Iterator<QueueQueryResult>, Exception>() {
public Iterator<QueueQueryResult> execute(Session session) throws Exception {
- // TODO Auto-generated method stub
return session.queueListByType(type, null, Integer.MAX_VALUE);
}
@@ -307,7 +304,7 @@
private final void processOps() {
int count = 0;
-
+ Session session = store.getSession();
while (running.get()) {
final OperationBase firstOp = getNextOp(true);
if (firstOp == null) {
@@ -318,31 +315,34 @@
// The first operation we get, triggers a store transaction.
if (firstOp != null) {
final LinkedList<Operation> processedQueue = new LinkedList<Operation>();
+ boolean locked = false;
try {
Operation op = firstOp;
- // TODO the recursion here leads to a rather large stack,
- // refactor.
while (op != null) {
final Operation toExec = op;
if (toExec.beginExecute()) {
+ if (!locked) {
+ session.acquireLock();
+ locked = true;
+ }
count++;
-
- store.execute(new Store.VoidCallback<Exception>() {
- @Override
- public void run(Session session) throws Exception {
-
- // Try to execute the operation against the
- // session...
- try {
- toExec.execute(session);
- processedQueue.add(toExec);
- } catch (CancellationException ignore) {
- // System.out.println("Cancelled" +
- // toExec);
- }
- }
- }, null);
+ op.execute(session);
+ processedQueue.add(op);
+ /*
+ * store.execute(new Store.VoidCallback<Exception>()
+ * {
+ *
+ * @Override public void run(Session session) throws
+ * Exception {
+ *
+ * // Try to execute the operation against the //
+ * session... try { toExec.execute(session);
+ * processedQueue.add(toExec); } catch
+ * (CancellationException ignore) { //
+ * System.out.println("Cancelled" + // toExec); } }
+ * }, null);
+ */
}
if (count < 1000) {
@@ -356,6 +356,11 @@
// If we procecessed some ops, flush and post process:
if (!processedQueue.isEmpty()) {
+ if (locked) {
+ session.commit();
+ session.releaseLock();
+ locked = false;
+ }
if (DEBUG)
System.out.println("Flushing queue after processing: " + processedQueue.size() + " - " + processedQueue);
// Sync the store:
@@ -393,6 +398,16 @@
IOException ioe = new IOException(e.getMessage());
ioe.initCause(e);
onDatabaseException(ioe);
+ } finally {
+ if (locked) {
+ try {
+ session.releaseLock();
+ } catch (Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(e);
+ onDatabaseException(ioe);
+ }
+ }
}
}
}
@@ -1098,12 +1113,13 @@
return store.allocateStoreTracking();
}
- public IDispatcher getDispatcher() {
- return dispatcher;
- }
- public void setDispatcher(IDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
+ public IDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(IDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
public Store getStore() {
return store;
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerTestBase.java Tue Jun 16 17:42:21 2009
@@ -53,7 +53,7 @@
protected final boolean USE_KAHA_DB = true;
protected final boolean PURGE_STORE = true;
- protected final boolean PERSISTENT = false;
+ protected final boolean PERSISTENT = true;
protected final boolean DURABLE = true;
// Set to put senders and consumers on separate brokers.
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Tue Jun 16 17:42:21 2009
@@ -22,7 +22,6 @@
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -67,12 +66,14 @@
private static final int BEGIN_UNIT_OF_WORK = -1;
private static final int END_UNIT_OF_WORK = -2;
private static final int FLUSH = -3;
+ private static final int CANCEL_UNIT_OF_WORK = -4;
private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
- private static final ByteSequence BEGIN_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+ private static final ByteSequence BEGIN_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { BEGIN_UNIT_OF_WORK });
private static final ByteSequence END_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { END_UNIT_OF_WORK });
+ private static final ByteSequence CANCEL_UNIT_OF_WORK_DATA = new ByteSequence(new byte[] { CANCEL_UNIT_OF_WORK });
private static final ByteSequence FLUSH_DATA = new ByteSequence(new byte[] { FLUSH });
public static final int CLOSED_STATE = 1;
@@ -116,12 +117,9 @@
// /////////////////////////////////////////////////////////////////
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
- try
- {
+ try {
load();
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
LOG.error("Error loading store", e);
}
}
@@ -140,6 +138,10 @@
return trackingGen.incrementAndGet();
}
+ public boolean isTransactional() {
+ return true;
+ }
+
private void loadPageFile() throws IOException {
indexLock.writeLock().lock();
try {
@@ -229,7 +231,7 @@
}
}
- public void load() throws IOException {
+ private void load() throws IOException {
indexLock.writeLock().lock();
try {
open();
@@ -246,7 +248,7 @@
loadPageFile();
}
- store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
+ store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
} finally {
indexLock.writeLock().unlock();
}
@@ -260,10 +262,11 @@
try {
pageFile.unload();
rootEntity = new RootEntity();
+ journal.close();
} finally {
indexLock.writeLock().unlock();
}
- journal.close();
+
checkpointThread.join();
lockFile.unlock();
lockFile = null;
@@ -333,6 +336,8 @@
redoCounter += uow.size();
uow = null;
}
+ } else if (data.length == 1 && data.data[0] == CANCEL_UNIT_OF_WORK) {
+ uow = null;
} else if (data.length == 1 && data.data[0] == FLUSH) {
} else {
final TypeCreatable message = load(recoveryPosition);
@@ -409,32 +414,8 @@
// in that case we need to removed references to messages that are not
// in the journal
final Location lastAppendLocation = journal.getLastAppendLocation();
- long undoCounter = 0;
+ int undoCounter = rootEntity.recoverIndex(lastAppendLocation, tx);
- // TODO
- // // Go through all the destinations to see if they have messages past
- // the lastAppendLocation
- // for (StoredDestinationState sd : storedDestinations.values()) {
- //
- // final ArrayList<Long> matches = new ArrayList<Long>();
- // // Find all the Locations that are >= than the last Append Location.
- // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
- // Long>(lastAppendLocation) {
- // @Override
- // protected void matched(Location key, Long value) {
- // matches.add(value);
- // }
- // });
- //
- //
- // for (Long sequenceId : matches) {
- // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- // sd.locationIndex.remove(tx, keys.location);
- // sd.messageIdIndex.remove(tx, keys.messageId);
- // undoCounter++;
- // // TODO: do we need to modify the ack positions for the pub sub case?
- // }
- // }
long end = System.currentTimeMillis();
if (undoCounter > 0) {
// The rolledback operations are basically in flight journal writes.
@@ -444,10 +425,6 @@
}
}
- public Location getLastUpdatePosition() throws IOException {
- return rootEntity.getLastUpdate();
- }
-
private Location getRecoveryPosition() throws IOException {
if (rootEntity.getLastUpdate() != null) {
@@ -620,8 +597,8 @@
// /////////////////////////////////////////////////////////////////
long messageSequence;
- public Location store(TypeCreatable data) throws IOException {
- return store(data, null);
+ private Location store(TypeCreatable data, Transaction tx) throws IOException {
+ return store(data, null, tx);
}
/**
@@ -633,105 +610,48 @@
* @throws IOException
*/
@SuppressWarnings("unchecked")
- public Location store(final TypeCreatable data, Runnable onFlush) throws IOException {
+ private Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
final MessageBuffer message = ((PBMessage) data).freeze();
int size = message.serializedSizeUnframed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.toType().getNumber());
message.writeUnframed(os);
- long start = System.currentTimeMillis();
- final Location location;
- synchronized (journal) {
- location = journal.write(os.toByteSequence(), onFlush);
- }
- long start2 = System.currentTimeMillis();
-
- try {
+ //If we aren't in a transaction acquire the index lock
+ if (tx == null) {
indexLock.writeLock().lock();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- updateIndex(tx, data.toType(), message, location);
- }
- });
- rootEntity.setLastUpdate(location);
-
- } finally {
- indexLock.writeLock().unlock();
- }
-
- long end = System.currentTimeMillis();
- if (end - start > 1000) {
- LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
}
- return location;
- }
-
- public void store(List<TypeCreatable> batch) throws IOException {
- store(batch, null);
- }
- // ArrayList<TypeCreatable>
- /**
- * All updated are are funneled through this method. The updates a converted
- * to a PBMessage which is logged to the journal and then the data from the
- * PBMessage is used to update the index just like it would be done during a
- * recovery process.
- *
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- public void store(final List<TypeCreatable> batch, Runnable onFlush) throws IOException {
- if (batch.isEmpty()) {
- return;
- }
- if (batch.size() == 1) {
- store(batch.get(0), onFlush);
- return;
- }
-
- final ArrayList<UoWOperation> uow = new ArrayList<UoWOperation>(batch.size());
- for (TypeCreatable bean : batch) {
- final MessageBuffer message = ((PBMessage) bean).freeze();
- int size = message.serializedSizeUnframed();
- DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
- os.writeByte(bean.toType().getNumber());
- message.writeUnframed(os);
- UoWOperation op = new UoWOperation();
- op.bean = bean;
- op.data = os.toByteSequence();
- uow.add(op);
- }
+ try {
- long start = System.currentTimeMillis();
- synchronized (journal) {
- journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
- for (UoWOperation op : uow) {
- op.location = journal.write(op.data, false);
+ long start = System.currentTimeMillis();
+ final Location location;
+ synchronized (journal) {
+ location = journal.write(os.toByteSequence(), onFlush);
}
- journal.write(END_UNIT_OF_WORK_DATA, onFlush);
- }
- long start2 = System.currentTimeMillis();
+ long start2 = System.currentTimeMillis();
- try {
- indexLock.writeLock().lock();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- for (UoWOperation op : uow) {
- MessageBuffer message = ((PBMessage) op.bean).freeze();
- updateIndex(tx, op.bean.toType(), message, op.location);
- rootEntity.setLastUpdate(op.location);
+ if (tx == null) {
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ updateIndex(tx, data.toType(), message, location);
}
- }
- });
+ });
+ } else {
+ updateIndex(tx, data.toType(), message, location);
+ }
+
+ long end = System.currentTimeMillis();
+ if (end - start > 1000) {
+ LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
+ }
+ return location;
+
} finally {
- indexLock.writeLock().unlock();
+ if (tx == null)
+ indexLock.writeLock().unlock();
}
- long end = System.currentTimeMillis();
- if (end - start > 1000) {
- LOG.warn("KahaDB long enqueue time: Journal Add Took: " + (start2 - start) + " ms, Index Update took " + (end - start2) + " ms");
- }
}
/**
@@ -741,7 +661,7 @@
* @return
* @throws IOException
*/
- public TypeCreatable load(Location location) throws IOException {
+ private TypeCreatable load(Location location) throws IOException {
ByteSequence data = journal.read(location);
return load(location, data);
}
@@ -764,19 +684,19 @@
switch (type) {
case MESSAGE_ADD:
messageAdd(tx, (MessageAdd) command, location);
- return;
+ break;
case QUEUE_ADD:
queueAdd(tx, (QueueAdd) command, location);
- return;
+ break;
case QUEUE_REMOVE:
queueRemove(tx, (QueueRemove) command, location);
- return;
+ break;
case QUEUE_ADD_MESSAGE:
queueAddMessage(tx, (QueueAddMessage) command, location);
- return;
+ break;
case QUEUE_REMOVE_MESSAGE:
queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
- return;
+ break;
case TRANSACTION_BEGIN:
case TRANSACTION_ADD_MESSAGE:
@@ -793,6 +713,7 @@
case STREAM_REMOVE:
throw new UnsupportedOperationException();
}
+ rootEntity.setLastUpdate(location);
}
private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
@@ -846,20 +767,25 @@
}
class KahaDBSession implements Session {
- ArrayList<TypeCreatable> updates = new ArrayList<TypeCreatable>();
+ TypeCreatable atomicUpdate = null;
+ int updateCount = 0;
private Transaction tx;
private Transaction tx() {
- if (tx == null) {
- indexLock.readLock().lock();
- tx = pageFile.tx();
- }
+ acquireLock();
return tx;
}
- public void close() {
+ public final void commit() {
+ commit(null);
+ }
+
+ public final void rollback() {
try {
+ if (updateCount > 1) {
+ journal.write(CANCEL_UNIT_OF_WORK_DATA, false);
+ }
if (tx != null) {
tx.rollback();
}
@@ -867,30 +793,90 @@
throw new FatalStoreException(e);
} finally {
if (tx != null) {
- indexLock.readLock().unlock();
tx = null;
+ updateCount = 0;
+ atomicUpdate = null;
}
}
}
+ /**
+ * Indicates callers intent to start a transaction.
+ */
+ public final void acquireLock() {
+ if (tx == null) {
+ indexLock.writeLock().lock();
+ tx = pageFile.tx();
+ }
+ }
+
+ public final void releaseLock() {
+ try {
+ if (tx != null) {
+ rollback();
+ }
+ } finally {
+ indexLock.writeLock().unlock();
+ }
+ }
+
public void commit(Runnable onFlush) {
try {
+
+ boolean flush = false;
+ if (atomicUpdate != null) {
+ store(atomicUpdate, onFlush, tx);
+ } else if (updateCount > 1) {
+ journal.write(END_UNIT_OF_WORK_DATA, onFlush);
+ } else {
+ flush = onFlush != null;
+ }
+
if (tx != null) {
tx.commit();
}
+
+ if (flush) {
+ onFlush.run();
+ }
+
} catch (IOException e) {
throw new FatalStoreException(e);
} finally {
- if (tx != null) {
- indexLock.readLock().unlock();
- tx = null;
+ tx = null;
+ updateCount = 0;
+ atomicUpdate = null;
+ }
+ }
+
+ private void storeAtomic() {
+ if (atomicUpdate != null) {
+ try {
+ journal.write(BEGIN_UNIT_OF_WORK_DATA, false);
+ store(atomicUpdate, null, tx);
+ atomicUpdate = null;
+ } catch (IOException ioe) {
+ throw new FatalStoreException(ioe);
}
}
+ }
+ private void addUpdate(TypeCreatable bean) {
try {
- store(updates, onFlush);
- } catch (IOException e) {
- throw new FatalStoreException(e);
+ //As soon as we do more than one update we'll wrap in a unit of
+ //work:
+ if (updateCount == 0) {
+ atomicUpdate = bean;
+ updateCount++;
+ return;
+ }
+ storeAtomic();
+
+ updateCount++;
+ store(bean, null, tx);
+
+ } catch (IOException ioe) {
+ throw new FatalStoreException(ioe);
}
}
@@ -915,10 +901,12 @@
if (streamKey != null) {
bean.setStreamKey(streamKey);
}
- updates.add(bean);
+
+ addUpdate(bean);
}
public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
+ storeAtomic();
Location location = rootEntity.messageGetLocation(tx(), key);
if (location == null) {
throw new KeyNotFoundException("message key: " + key);
@@ -955,14 +943,15 @@
update.setParentName(parent);
update.setPartitionId(descriptor.getPartitionKey());
}
- updates.add(update);
+ addUpdate(update);
}
public void queueRemove(QueueDescriptor descriptor) {
- updates.add(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
+ addUpdate(new QueueRemoveBean().setQueueName(descriptor.getQueueName()));
}
public Iterator<QueueQueryResult> queueListByType(short type, QueueDescriptor firstQueue, int max) {
+ storeAtomic();
try {
return rootEntity.queueList(tx(), type, firstQueue, max);
} catch (IOException e) {
@@ -971,6 +960,7 @@
}
public Iterator<QueueQueryResult> queueList(QueueDescriptor firstQueue, int max) {
+ storeAtomic();
try {
return rootEntity.queueList(tx(), (short) -1, firstQueue, max);
} catch (IOException e) {
@@ -987,17 +977,18 @@
if (record.getAttachment() != null) {
bean.setAttachment(record.getAttachment());
}
- updates.add(bean);
+ addUpdate(bean);
}
public void queueRemoveMessage(QueueDescriptor queue, Long messageKey) throws KeyNotFoundException {
QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
bean.setMessageKey(messageKey);
bean.setQueueName(queue.getQueueName());
- updates.add(bean);
+ addUpdate(bean);
}
public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxQueueKey, int max) throws KeyNotFoundException {
+ storeAtomic();
DestinationEntity destination = rootEntity.getDestination(queue);
if (destination == null) {
throw new KeyNotFoundException("queue key: " + queue);
@@ -1084,14 +1075,23 @@
}
}
+ public Session getSession() {
+ return new KahaDBSession();
+ }
+
+ /**
+ * Convenienct method for executing a batch of work within a store
+ * transaction.
+ */
public <R, T extends Exception> R execute(final Callback<R, T> callback, final Runnable onFlush) throws T {
KahaDBSession session = new KahaDBSession();
+ session.acquireLock();
try {
R rc = callback.execute(session);
session.commit(onFlush);
return rc;
} finally {
- session.close();
+ session.releaseLock();
}
}
@@ -1136,13 +1136,28 @@
return manager;
}
+ private PageFile getPageFile() {
+ if (pageFile == null) {
+ pageFile = createPageFile();
+ }
+ return pageFile;
+ }
+
+ private Journal getJournal() {
+ if (journal == null) {
+ journal = createJournal();
+ }
+ return journal;
+ }
+
public File getDirectory() {
return directory;
}
- public File getStoreDirectory() {
- return directory;
- }
+ public File getStoreDirectory() {
+ return directory;
+ }
+
public void setStoreDirectory(File directory) {
this.directory = directory;
}
@@ -1203,20 +1218,6 @@
return journalMaxFileLength;
}
- public PageFile getPageFile() {
- if (pageFile == null) {
- pageFile = createPageFile();
- }
- return pageFile;
- }
-
- public Journal getJournal() {
- if (journal == null) {
- journal = createJournal();
- }
- return journal;
- }
-
public boolean isFailIfDatabaseIsLocked() {
return failIfDatabaseIsLocked;
}
@@ -1225,5 +1226,4 @@
this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
-
}
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Tue Jun 16 17:42:21 2009
@@ -425,4 +425,45 @@
return lastSequence;
}
}
+
+ /**
+ * @param lastAppendLocation
+ * @param tx
+ * @return the number of undone index entries
+ */
+ public int recoverIndex(Location lastAppendLocation, Transaction tx) {
+
+ //TODO check that none of the locations specified by the indexes
+ //are past the last update location in the journal. This can happen
+ //if the index is flushed before the journal.
+ //
+ //Collection<DestinationEntity> values = destinations.values();
+ //for (DestinationEntity de : values) {
+ // count +=
+ //}
+ // Go through all the destinations to see if they have messages past
+ // the lastAppendLocation
+ //for (StoredDestinationState sd :
+ //
+ // final ArrayList<Long> matches = new ArrayList<Long>();
+ // // Find all the Locations that are >= than the last Append Location.
+ // sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
+ // Long>(lastAppendLocation) {
+ // @Override
+ // protected void matched(Location key, Long value) {
+ // matches.add(value);
+ // }
+ // });
+ //
+ //
+ // for (Long sequenceId : matches) {
+ // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ // sd.locationIndex.remove(tx, keys.location);
+ // sd.messageIdIndex.remove(tx, keys.messageId);
+ // undoCounter++;
+ // // TODO: do we need to modify the ack positions for the pub sub case?
+ // }
+ // }
+ return 0;
+ }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Tue Jun 16 17:42:21 2009
@@ -118,6 +118,11 @@
public long allocateStoreTracking();
/**
+ * @return A new store Session.
+ */
+ public Session getSession();
+
+ /**
* This interface is used to execute transacted code.
*
* It is used by the {@link Store#execute(Callback)} method, often as
@@ -354,6 +359,12 @@
*/
public void flush();
+
+ /**
+ * @return true if the store is transactional.
+ */
+ public boolean isTransactional();
+
/**
* This interface allows you to query and update the Store.
*
@@ -362,6 +373,31 @@
*
*/
public interface Session {
+
+ /**
+ * Commits work done on the Session
+ */
+ public void commit();
+
+ /**
+ * Rolls back work done on the Session
+ * since the last call to {@link #acquireLock()}
+ *
+ * @throw {@link UnsupportedOperationException} if the store is not transactional
+ */
+ public void rollback();
+
+ /**
+ * Indicates callers intent to start a transaction.
+ */
+ public void acquireLock();
+
+ /**
+ * Indicates caller is done with the transaction, if
+ * not committed then the transaction will be rolled back (providing
+ * the store is transactional.
+ */
+ public void releaseLock();
public void messageAdd(MessageRecord message);
@@ -397,7 +433,7 @@
*
* @param firstQueueName
* If null starts the query at the first queue.
- * @param max
+ * @param max
* The maximum number of queues to return
* @return The list of queues.
*/
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Tue Jun 16 17:42:21 2009
@@ -25,8 +25,10 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.Session;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.queue.QueueDescriptor;
@@ -43,6 +45,7 @@
private MemorySession session = new MemorySession();
private AtomicLong trackingGen = new AtomicLong(0);
+ private ReentrantLock updateLock = new ReentrantLock();
/**
* @return a unique sequential store tracking number.
@@ -51,6 +54,10 @@
return trackingGen.incrementAndGet();
}
+ public boolean isTransactional() {
+ return false;
+ }
+
static private class Stream {
private ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -126,7 +133,7 @@
} else {
list = new ArrayList<QueueRecord>(max);
}
-
+
for (Long key : records.tailMap(firstQueueKey).keySet()) {
if ((max >= 0 && list.size() >= max) || (maxSequence >= 0 && key > maxSequence)) {
break;
@@ -198,7 +205,7 @@
result.count = count;
result.size = size;
result.firstSequence = records.isEmpty() ? 0 : records.get(records.firstKey()).getQueueKey();
- result.lastSequence = records.isEmpty() ? 0 : records.get(records.lastKey()).getQueueKey();
+ result.lastSequence = records.isEmpty() ? 0 : records.get(records.lastKey()).getQueueKey();
result.desc = descriptor.copy();
if (this.partitions != null) {
ArrayList<QueueQueryResult> childResults = new ArrayList<QueueQueryResult>(partitions.size());
@@ -212,6 +219,14 @@
}
}
+
+ /**
+ * @return A new store Session.
+ */
+ public Session getSession()
+ {
+ return session;
+ }
static private class RemoveOp {
QueueDescriptor queue;
@@ -283,12 +298,10 @@
}
public long getFirstSequence() {
- // TODO Auto-generated method stub
return firstSequence;
}
public long getLastSequence() {
- // TODO Auto-generated method stub
return lastSequence;
}
}
@@ -304,6 +317,44 @@
private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
+ /**
+ * Commits work done on the Session, if {@link Store#isTransactional()} is true.
+ */
+ public void commit()
+ {
+ //NOOP
+ }
+
+ /**
+ * Rolls back work done on the Session
+ * since the last call to {@link #acquireLock()}
+ */
+ public void rollback()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Indicates callers intent to start a transaction. If the store
+ * is transaction, the caller must call {@link #commit()} when the
+ * done operating on the Session prior to a mandatory call to
+ * {@link #releaseLock()}
+ */
+ public void acquireLock()
+ {
+ updateLock.lock();
+ }
+
+ /**
+ * Indicates caller is done with the transaction, if
+ * not committed then the transaction will be rolled back (providing
+ * the store is transactional.
+ */
+ public void releaseLock()
+ {
+ updateLock.unlock();
+ }
+
// //////////////////////////////////////////////////////////////////////////////
// Message related methods.
// ///////////////////////////////////////////////////////////////////////////////
@@ -404,7 +455,7 @@
}
for (StoredQueue sq : tailResults) {
- if (max >=0 && results.size() >= max) {
+ if (max >= 0 && results.size() >= max) {
break;
}
if (type != -1 && sq.descriptor.getApplicationType() != type) {
@@ -598,8 +649,7 @@
}
public void setDeleteAllMessages(boolean val) {
- // TODO Auto-generated method stub
-
+ // NOOP
}
Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java?rev=785317&r1=785316&r2=785317&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StorePerformanceBase.java Tue Jun 16 17:42:21 2009
@@ -18,7 +18,11 @@
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
@@ -35,12 +39,13 @@
public abstract class StorePerformanceBase extends TestCase {
- private static int PERFORMANCE_SAMPLES = 3;
+ private static int PERFORMANCE_SAMPLES = 5;
private static boolean SYNC_TO_DISK = true;
-
-
+ private static final boolean USE_SHARED_WRITER = true;
+
private Store store;
private QueueDescriptor queueId;
+ private AtomicLong queueKey = new AtomicLong(0);
protected MetricAggregator totalProducerRate = new MetricAggregator().name("Aggregate Producer Rate").unit("items");
protected MetricAggregator totalConsumerRate = new MetricAggregator().name("Aggregate Consumer Rate").unit("items");
@@ -50,11 +55,21 @@
abstract protected Store createStore();
+ private SharedWriter writer = null;
+ private Semaphore writePermits = null;
+
@Override
protected void setUp() throws Exception {
store = createStore();
store.start();
-
+
+ if (USE_SHARED_WRITER) {
+ writer = new SharedWriter();
+ writer.start();
+ }
+
+ writePermits = new Semaphore(1000);
+
queueId = new QueueDescriptor();
queueId.setQueueName(new AsciiBuffer("test"));
store.execute(new VoidCallback<Exception>() {
@@ -75,14 +90,96 @@
p.stop();
}
producers.clear();
-
+
+ if (writer != null) {
+ writer.stop();
+ }
+
if (store != null) {
store.stop();
}
}
- private final Object wakeupMutex = new Object();
-
+ private final Object wakeupMutex = new Object();
+
+ class SharedWriter implements Runnable {
+ LinkedBlockingQueue<SharedQueueOp> queue = new LinkedBlockingQueue<SharedQueueOp>(1000);
+ private Thread thread;
+ private AtomicBoolean stopped = new AtomicBoolean();
+
+ public void start() {
+ thread = new Thread(this, "Writer");
+ thread.start();
+ }
+
+ public void stop() throws InterruptedException {
+ stopped.set(true);
+
+ //Add an op to trigger shutdown:
+ SharedQueueOp op = new SharedQueueOp() {
+ public void run() {
+ }
+ };
+ op.op = new Store.VoidCallback<Exception>() {
+
+ @Override
+ public void run(Session session) throws Exception {
+ // TODO Auto-generated method stub
+ }
+ };
+
+ queue.put(op);
+ thread.join();
+ }
+
+ public void run() {
+ Session session = store.getSession();
+ try {
+ LinkedList<Runnable> processed = new LinkedList<Runnable>();
+ while (!stopped.get()) {
+ SharedQueueOp op = queue.take();
+ session.acquireLock();
+ int ops = 0;
+ while (op != null && ops < 1000) {
+ op.op.execute(session);
+ processed.add(op);
+ op = queue.poll();
+ ops++;
+ }
+
+ session.commit();
+ session.releaseLock();
+
+ if (SYNC_TO_DISK) {
+ store.flush();
+ }
+
+ for (Runnable r : processed) {
+ r.run();
+ }
+ processed.clear();
+ }
+
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ e.printStackTrace();
+ }
+ return;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ public void addOp(SharedQueueOp op) throws InterruptedException {
+ queue.put(op);
+ }
+ }
+
+ abstract class SharedQueueOp implements Runnable {
+ VoidCallback<Exception> op;
+ }
+
class Producer implements Runnable {
private Thread thread;
private AtomicBoolean stopped = new AtomicBoolean();
@@ -91,104 +188,122 @@
private long sleep;
public Producer(String name) {
- this.name=name;
+ this.name = name;
}
+
public void start() {
rate.name("Producer " + name + " Rate");
totalProducerRate.add(rate);
- thread = new Thread(this, "Producer"+ name);
+ thread = new Thread(this, "Producer" + name);
thread.start();
}
+
public void stop() throws InterruptedException {
stopped.set(true);
+ while (writePermits.hasQueuedThreads()) {
+ writePermits.release();
+ }
thread.join();
}
+
public void run() {
try {
Buffer buffer = new Buffer(new byte[1024]);
- for( long i=0; !stopped.get(); i++ ) {
-
+ for (long i = 0; !stopped.get(); i++) {
+
+ writePermits.acquireUninterruptibly();
+
final MessageRecord messageRecord = new MessageRecord();
messageRecord.setKey(store.allocateStoreTracking());
- messageRecord.setMessageId(new AsciiBuffer(""+i));
+ messageRecord.setMessageId(new AsciiBuffer("" + i));
messageRecord.setEncoding(new AsciiBuffer("encoding"));
messageRecord.setBuffer(buffer);
messageRecord.setSize(buffer.getLength());
- Runnable onFlush = new Runnable(){
+ SharedQueueOp op = new SharedQueueOp() {
public void run() {
rate.increment();
- synchronized(wakeupMutex){
+ writePermits.release();
+ synchronized (wakeupMutex) {
wakeupMutex.notify();
}
}
};
- final long queueKey = i + 1;
- store.execute(new VoidCallback<Exception>() {
+
+ op.op = new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
session.messageAdd(messageRecord);
QueueRecord queueRecord = new Store.QueueRecord();
queueRecord.setMessageKey(messageRecord.getKey());
- queueRecord.setQueueKey(queueKey);
+ queueRecord.setQueueKey(queueKey.incrementAndGet());
queueRecord.setSize(messageRecord.getSize());
session.queueAddMessage(queueId, queueRecord);
}
- }, onFlush);
-
- if( SYNC_TO_DISK ) {
- store.flush();
+ };
+
+ if (!USE_SHARED_WRITER) {
+ store.execute(op.op, op);
+
+ if (SYNC_TO_DISK) {
+ store.flush();
+ }
+
+ } else {
+ writer.addOp(op);
}
-
- if( sleep>0 ) {
+ if (sleep > 0) {
Thread.sleep(sleep);
}
}
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ e.printStackTrace();
+ }
+ return;
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
+
class Consumer implements Runnable {
private Thread thread;
private AtomicBoolean stopped = new AtomicBoolean();
protected final MetricCounter rate = new MetricCounter();
private String name;
+ private final Semaphore queryWait = new Semaphore(0);
public Consumer(String name) {
- this.name=name;
+ this.name = name;
}
+
public void start() {
rate.name("Consumer " + name + " Rate");
totalConsumerRate.add(rate);
- thread = new Thread(this, "Consumer " + name );
+ thread = new Thread(this, "Consumer " + name);
thread.start();
}
+
public void stop() throws InterruptedException {
stopped.set(true);
+ queryWait.release();
thread.join();
}
-
+
public void run() {
try {
- while( !stopped.get() ) {
- final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);;
- Runnable onFlush = new Runnable(){
+ while (!stopped.get()) {
+ final ArrayList<MessageRecord> records = new ArrayList<MessageRecord>(1000);
+ SharedQueueOp op = new SharedQueueOp() {
public void run() {
rate.increment(records.size());
- if( records.isEmpty() ) {
- synchronized(wakeupMutex){
- try {
- wakeupMutex.wait(500);
- } catch (InterruptedException e) {
- }
- }
- }
+ queryWait.release();
}
};
- store.execute(new VoidCallback<Exception>() {
+
+ op.op = new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
Iterator<QueueRecord> queueRecords = session.queueListMessagesQueue(queueId, 0L, -1L, 1000);
@@ -198,23 +313,45 @@
session.queueRemoveMessage(queueId, r.messageKey);
}
}
- }, onFlush);
- if( SYNC_TO_DISK ) {
- store.flush();
+ };
+
+ if (!USE_SHARED_WRITER) {
+ store.execute(op.op, op);
+ if (SYNC_TO_DISK) {
+ store.flush();
+ }
+ } else {
+ writer.addOp(op);
}
+
+ //queryWait.acquireUninterruptibly();
+ if (records.isEmpty()) {
+ // synchronized (wakeupMutex) {
+ // try {
+ // wakeupMutex.wait(500);
+ // } catch (InterruptedException e) {
+ // }
+ // }
+ }
+ records.clear();
+ }
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ e.printStackTrace();
}
+ return;
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
+
public void test1_1_1() throws Exception {
startProducers(1);
startConsumers(1);
reportRates();
}
-
+
public void test10_1_1() throws Exception {
startProducers(10);
startConsumers(1);
@@ -223,20 +360,20 @@
private void startProducers(int count) {
for (int i = 0; i < count; i++) {
- Producer p = new Producer(""+(i+1));
+ Producer p = new Producer("" + (i + 1));
producers.add(p);
p.start();
}
}
-
+
private void startConsumers(int count) {
for (int i = 0; i < count; i++) {
- Consumer c = new Consumer(""+(i+1));
+ Consumer c = new Consumer("" + (i + 1));
consumers.add(c);
c.start();
}
}
-
+
private void reportRates() throws InterruptedException {
System.out.println("Checking rates for test: " + getName());
for (int i = 0; i < PERFORMANCE_SAMPLES; i++) {
@@ -248,5 +385,5 @@
totalConsumerRate.reset();
}
}
-
+
}