You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/03/26 17:47:35 UTC
svn commit: r758737 - in
/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store:
./ kahadb/ memory/
Author: chirino
Date: Thu Mar 26 16:47:24 2009
New Revision: 758737
URL: http://svn.apache.org/viewvc?rev=758737&view=rev
Log:
Filling out the kahadb impl.
Added:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
Removed:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDBState.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/StoredDestinationState.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/BrokerDatabase.java Thu Mar 26 16:47:24 2009
@@ -462,15 +462,18 @@
RestoredMessageImpl rm = new RestoredMessageImpl();
// TODO should update jms redelivery here.
rm.qRecord = records.next();
- rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
- rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
- if (rm.handler == null) {
- try {
- rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
- protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
- } catch (Throwable thrown) {
- throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+ try {
+ rm.mRecord = session.messageGetRecord(rm.qRecord.messageKey);
+ rm.handler = protocolHandlers.get(rm.mRecord.encoding.toString());
+ if (rm.handler == null) {
+ try {
+ rm.handler = ProtocolHandlerFactory.createProtocolHandler(rm.mRecord.encoding.toString());
+ protocolHandlers.put(rm.mRecord.encoding.toString(), rm.handler);
+ } catch (Throwable thrown) {
+ throw new RuntimeException("Unknown message format" + rm.mRecord.encoding.toString(), thrown);
+ }
}
+ } catch (KeyNotFoundException shouldNotHappen) {
}
}
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/Store.java Thu Mar 26 16:47:24 2009
@@ -230,7 +230,7 @@
public Long messageAdd(MessageRecord message);
public Long messageGetKey(AsciiBuffer messageId);
- public MessageRecord messageGetRecord(Long key);
+ public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
public Long streamOpen();
public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
@@ -249,7 +249,7 @@
// Queue related methods.
public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max);
public void queueAdd(AsciiBuffer queueName);
- public boolean queueRemove(AsciiBuffer queueName);
+ public void queueRemove(AsciiBuffer queueName);
public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException;
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=758737&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Thu Mar 26 16:47:24 2009
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.store.Store.QueueRecord;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.index.BTreeVisitor;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.LongMarshaller;
+import org.apache.kahadb.util.Marshaller;
+
+public class DestinationEntity {
+
+ public final static Marshaller<DestinationEntity> MARSHALLER = new Marshaller<DestinationEntity>() {
+
+ public Class<DestinationEntity> getType() {
+ return DestinationEntity.class;
+ }
+
+ public DestinationEntity readPayload(DataInput dataIn) throws IOException {
+ DestinationEntity value = new DestinationEntity();
+ value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
+ return value;
+ }
+
+ public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
+ dataOut.writeLong(value.queueIndex.getPageId());
+ }
+ };
+
+ private long nextQueueKey;
+ private BTreeIndex<Long, QueueRecord> queueIndex;
+
+ ///////////////////////////////////////////////////////////////////
+ // Lifecycle Methods.
+ ///////////////////////////////////////////////////////////////////
+ public void allocate(Transaction tx) throws IOException {
+ queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
+ }
+
+ public void deallocate(Transaction tx) throws IOException {
+ queueIndex.clear(tx);
+ tx.free(queueIndex.getPageId());
+ queueIndex=null;
+ }
+
+ public void load(Transaction tx) throws IOException {
+ if( queueIndex.getPageFile()==null ) {
+
+ queueIndex.setPageFile(tx.getPageFile());
+ queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
+ queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+ queueIndex.load(tx);
+
+ // Figure out the next key using the last entry in the destination.
+ Entry<Long, QueueRecord> lastEntry = queueIndex.getLast(tx);
+ if( lastEntry!=null ) {
+ nextQueueKey = lastEntry.getKey()+1;
+ }
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Message Methods.
+ ///////////////////////////////////////////////////////////////////
+ public Long nextQueueKey() {
+ return nextQueueKey++;
+ }
+
+ public void add(Transaction tx, QueueAddMessage command) throws IOException {
+ QueueRecord value = new QueueRecord();
+ value.setAttachment(command.getAttachment());
+ value.setMessageKey(command.getMessageKey());
+ value.setQueueKey(command.getQueueKey());
+ queueIndex.put(tx, value.getQueueKey(), value);
+ }
+
+ public void remove(Transaction tx, long queueKey) throws IOException {
+ queueIndex.remove(tx, queueKey);
+ }
+
+ public Iterator<QueueRecord> listMessages(Transaction tx, Long firstQueueKey, final int max) throws IOException {
+ final ArrayList<QueueRecord> rc = new ArrayList<QueueRecord>(max);
+ queueIndex.visit(tx, new BTreeVisitor.GTEVisitor<Long, QueueRecord>(firstQueueKey) {
+ @Override
+ public boolean isInterestedInKeysBetween(Long first, Long second) {
+ if (rc.size() >= max)
+ return false;
+ return super.isInterestedInKeysBetween(first, second);
+ }
+
+ @Override
+ protected void matched(Long key, QueueRecord value) {
+ if (rc.size() >= max)
+ return;
+ rc.add(value);
+ }
+ });
+ return rc.iterator();
+ }
+
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Thu Mar 26 16:47:24 2009
@@ -18,19 +18,19 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
+import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
+import org.apache.activemq.broker.store.kahadb.Data.QueueRemove;
import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
import org.apache.activemq.broker.store.kahadb.Data.Trace;
import org.apache.activemq.broker.store.kahadb.Data.Type;
@@ -46,7 +46,6 @@
import org.apache.activemq.protobuf.PBMessage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
import org.apache.kahadb.page.Page;
@@ -67,7 +66,7 @@
protected PageFile pageFile;
protected Journal journal;
- protected StoredDBState dbstate = new StoredDBState();
+ protected RootEntity rootEntity = new RootEntity();
protected boolean failIfDatabaseIsLocked;
protected boolean deleteAllMessages;
@@ -86,9 +85,8 @@
private Location nextRecoveryPosition;
private Location lastRecoveryPosition;
- protected final Object indexMutex = new Object();
+ protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
- private final HashMap<AsciiBuffer, StoredDestinationState> storedDestinations = new HashMap<AsciiBuffer, StoredDestinationState>();
///////////////////////////////////////////////////////////////////
// Lifecylce methods
@@ -106,50 +104,30 @@
}
private void loadPageFile() throws IOException {
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
final PageFile pageFile = getPageFile();
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
- dbstate.allocate(tx);
+ rootEntity.allocate(tx);
} else {
- Page<StoredDBState> page = tx.load(0, StoredDBState.MARSHALLER);
- dbstate = page.get();
- dbstate.page = page;
+ Page<RootEntity> page = tx.load(0, RootEntity.MARSHALLER);
+ rootEntity = page.get();
+ rootEntity.setPageId(0);
}
- dbstate.load(tx);
+ rootEntity.load(tx);
}
});
pageFile.flush();
- // Keep a cache of the StoredDestinations
- storedDestinations.clear();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- for (Iterator<Entry<AsciiBuffer, StoredDestinationState>> iterator = dbstate.destinations.iterator(tx); iterator.hasNext();) {
- Entry<AsciiBuffer, StoredDestinationState> entry = iterator.next();
- StoredDestinationState sd = loadStoredDestination(tx, entry.getKey());
- storedDestinations.put(entry.getKey(), sd);
- }
- }
- });
+ } finally {
+ indexLock.writeLock().unlock();
}
}
- private StoredDestinationState loadStoredDestination(Transaction tx, AsciiBuffer key) throws IOException {
- // Try to load the existing indexes..
- StoredDestinationState rc = dbstate.destinations.get(tx, key);
- if (rc == null) {
- // Brand new destination.. allocate indexes for it.
- rc = new StoredDestinationState();
- rc.allocate(tx);
- dbstate.destinations.put(tx, key, rc);
- }
- rc.load(tx);
- return rc;
- }
/**
* @throws IOException
@@ -211,8 +189,8 @@
}
public void load() throws IOException {
-
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
open();
if (deleteAllMessages) {
@@ -220,7 +198,7 @@
pageFile.unload();
pageFile.delete();
- dbstate = new StoredDBState();
+ rootEntity = new RootEntity();
LOG.info("Persistence store purged.");
deleteAllMessages = false;
@@ -228,15 +206,21 @@
loadPageFile();
}
store( new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())));
+ } finally {
+ indexLock.writeLock().unlock();
}
}
public void close() throws IOException, InterruptedException {
if( opened.compareAndSet(true, false)) {
- synchronized (indexMutex) {
+
+ indexLock.writeLock().lock();
+ try {
pageFile.unload();
- dbstate = new StoredDBState();
+ rootEntity = new RootEntity();
+ } finally {
+ indexLock.writeLock().unlock();
}
journal.close();
checkpointThread.join();
@@ -246,16 +230,19 @@
}
public void unload() throws IOException, InterruptedException {
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
if( pageFile.isLoaded() ) {
- dbstate.state = CLOSED_STATE;
+ rootEntity.setState(CLOSED_STATE);
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+ rootEntity.store(tx);
}
});
close();
}
+ } finally {
+ indexLock.writeLock().unlock();
}
}
@@ -273,7 +260,8 @@
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
@@ -282,7 +270,7 @@
while (recoveryPosition != null) {
final TypeCreatable message = load(recoveryPosition);
final Location location = lastRecoveryPosition;
- dbstate.lastUpdate = recoveryPosition;
+ rootEntity.setLastUpdate(recoveryPosition);
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
@@ -303,11 +291,14 @@
recoverIndex(tx);
}
});
+ } finally {
+ indexLock.writeLock().unlock();
}
}
public void incrementalRecover() throws IOException {
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
if( nextRecoveryPosition == null ) {
if( lastRecoveryPosition==null ) {
nextRecoveryPosition = getRecoveryPosition();
@@ -317,7 +308,7 @@
}
while (nextRecoveryPosition != null) {
lastRecoveryPosition = nextRecoveryPosition;
- dbstate.lastUpdate = lastRecoveryPosition;
+ rootEntity.setLastUpdate(lastRecoveryPosition);
final TypeCreatable message = load(lastRecoveryPosition);
final Location location = lastRecoveryPosition;
@@ -329,6 +320,8 @@
nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
}
+ } finally {
+ indexLock.writeLock().unlock();
}
}
@@ -370,14 +363,14 @@
}
public Location getLastUpdatePosition() throws IOException {
- return dbstate.lastUpdate;
+ return rootEntity.getLastUpdate();
}
private Location getRecoveryPosition() throws IOException {
- if( dbstate.lastUpdate!=null) {
+ if( rootEntity.getLastUpdate()!=null) {
// Start replay at the record after the last one recorded in the index file.
- return journal.getNextLocation(dbstate.lastUpdate);
+ return journal.getNextLocation(rootEntity.getLastUpdate());
}
// This loads the first position.
@@ -387,7 +380,8 @@
protected void checkpointCleanup(final boolean cleanup) {
try {
long start = System.currentTimeMillis();
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
if( !opened.get() ) {
return;
}
@@ -396,6 +390,8 @@
checkpointUpdate(tx, cleanup);
}
});
+ } finally {
+ indexLock.writeLock().unlock();
}
long end = System.currentTimeMillis();
if( end-start > 100 ) {
@@ -408,13 +404,16 @@
public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
- synchronized (indexMutex) {
+ indexLock.writeLock().lock();
+ try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, false);
}
});
closure.execute();
+ } finally {
+ indexLock.writeLock().unlock();
}
}
@@ -426,8 +425,8 @@
LOG.debug("Checkpoint started.");
- dbstate.state = OPEN_STATE;
- tx.store(dbstate.page, StoredDBState.MARSHALLER, true);
+ rootEntity.setState(OPEN_STATE);
+ rootEntity.store(tx);
pageFile.flush();
if( cleanup ) {
@@ -440,7 +439,7 @@
}
// Don't GC files after the first in progress tx
- Location firstTxLocation = dbstate.lastUpdate;
+ Location firstTxLocation = rootEntity.getLastUpdate();
if( firstTxLocation!=null ) {
while( !gcCandidateSet.isEmpty() ) {
@@ -453,52 +452,52 @@
}
}
- // Go through all the destinations to see if any of them can remove GC candidates.
- for (StoredDestinationState sd : storedDestinations.values()) {
- if( gcCandidateSet.isEmpty() ) {
- break;
- }
-
- // Use a visitor to cut down the number of pages that we load
- dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
- int last=-1;
- public boolean isInterestedInKeysBetween(Location first, Location second) {
- if( first==null ) {
- SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- } else if( second==null ) {
- SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- return !subset.isEmpty();
- } else {
- SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
- if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
- subset.remove(first.getDataFileId());
- }
- if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
- subset.remove(second.getDataFileId());
- }
- return !subset.isEmpty();
- }
- }
-
- public void visit(List<Location> keys, List<Long> values) {
- for (Location l : keys) {
- int fileId = l.getDataFileId();
- if( last != fileId ) {
- gcCandidateSet.remove(fileId);
- last = fileId;
- }
- }
- }
-
- });
- }
+// // Go through all the destinations to see if any of them can remove GC candidates.
+// for (StoredDestinationState sd : storedDestinations.values()) {
+// if( gcCandidateSet.isEmpty() ) {
+// break;
+// }
+//
+// // Use a visitor to cut down the number of pages that we load
+// dbstate.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
+// int last=-1;
+// public boolean isInterestedInKeysBetween(Location first, Location second) {
+// if( first==null ) {
+// SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
+// if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+// subset.remove(second.getDataFileId());
+// }
+// return !subset.isEmpty();
+// } else if( second==null ) {
+// SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
+// if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+// subset.remove(first.getDataFileId());
+// }
+// return !subset.isEmpty();
+// } else {
+// SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+// if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
+// subset.remove(first.getDataFileId());
+// }
+// if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
+// subset.remove(second.getDataFileId());
+// }
+// return !subset.isEmpty();
+// }
+// }
+//
+// public void visit(List<Location> keys, List<Long> values) {
+// for (Location l : keys) {
+// int fileId = l.getDataFileId();
+// if( last != fileId ) {
+// gcCandidateSet.remove(fileId);
+// last = fileId;
+// }
+// }
+// }
+//
+// });
+// }
if( !gcCandidateSet.isEmpty() ) {
LOG.debug("Cleanup removing the data files: "+gcCandidateSet);
@@ -541,22 +540,23 @@
final Location location = journal.write(os.toByteSequence(), sync);
long start2 = System.currentTimeMillis();
- synchronized (indexMutex) {
+
+ try {
+ indexLock.writeLock().lock();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, data.toType(), message, location);
}
});
+ rootEntity.setLastUpdate(location);
+ } finally {
+ indexLock.writeLock().unlock();
}
long end = System.currentTimeMillis();
if( end-start > 100 ) {
LOG.warn("KahaDB long enqueue time: Journal Add Took: "+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
}
-
- synchronized (indexMutex) {
- dbstate.lastUpdate = location;
- }
return location;
}
@@ -581,20 +581,24 @@
}
@SuppressWarnings("unchecked")
- public void updateIndex(Transaction tx, Type type, MessageBuffer message, Location location) {
+ public void updateIndex(Transaction tx, Type type, MessageBuffer command, Location location) throws IOException {
switch (type) {
case MESSAGE_ADD:
- messageAdd(tx, (MessageAdd)message, location);
+ messageAdd(tx, (MessageAdd)command, location);
return;
case QUEUE_ADD:
- queueAdd(tx, (QueueAdd)message, location);
+ queueAdd(tx, (QueueAdd)command, location);
+ return;
+ case QUEUE_REMOVE:
+ queueRemove(tx, (QueueRemove)command, location);
return;
case QUEUE_ADD_MESSAGE:
- queueAddMessage(tx, (QueueAdd)message, location);
+ queueAddMessage(tx, (QueueAddMessage)command, location);
return;
case QUEUE_REMOVE_MESSAGE:
- queueRemoveMessage(tx, (QueueRemoveMessage)message, location);
+ queueRemoveMessage(tx, (QueueRemoveMessage)command, location);
return;
+
case TRANSACTION_BEGIN:
case TRANSACTION_ADD_MESSAGE:
case TRANSACTION_REMOVE_MESSAGE:
@@ -612,90 +616,116 @@
}
}
- private void messageAdd(Transaction tx, MessageAdd message, Location location) {
+ private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+ rootEntity.messageAdd(tx, command, location);
+ }
+
+ private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
+ rootEntity.queueAdd(tx, command.getQueueName());
}
- private void queueAdd(Transaction tx, QueueAdd message, Location location) {
+
+ private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
+ rootEntity.queueRemove(tx, command.getQueueName());
}
- private void queueAddMessage(Transaction tx, QueueAdd message, Location location) {
+
+ private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
+ DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+ if( destination!=null ) {
+ destination.add(tx, command);
+ }
}
- private void queueRemoveMessage(Transaction tx, QueueRemoveMessage message, Location location) {
+ private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
+ DestinationEntity destination = rootEntity.getDestination(command.getQueueName());
+ if( destination!=null ) {
+ destination.remove(tx, command.getQueueKey());
+ }
}
class KahaDBSession implements Session {
+ ArrayList<TypeCreatable> updates = new ArrayList<TypeCreatable>();
+
+ private Transaction tx;
+ private Transaction tx() {
+ if( tx ==null ) {
+ indexLock.readLock().lock();
+ tx = pageFile.tx();
+ }
+ return tx;
+ }
///////////////////////////////////////////////////////////////
// Message related methods.
///////////////////////////////////////////////////////////////
public Long messageAdd(MessageRecord message) {
+ Long id = rootEntity.nextMessageKey();
+ MessageAddBean bean = new MessageAddBean();
+ bean.setBuffer(message.getBuffer());
+ bean.setEncoding(message.getEncoding());
+ bean.setMessageId(message.getMessageId());
+ bean.setMessageKey(id);
+ bean.setStreamKey(message.getStreamKey());
+ updates.add(bean);
+ return id;
+ }
+
+ public Long messageGetKey(AsciiBuffer messageId) {
+ return rootEntity.messageGetKey(tx(), messageId);
+ }
+
+ public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException {
+ Location location = rootEntity.messageGetLocation(tx(), key);
+ if( location ==null ) {
+ throw new KeyNotFoundException("message key: "+key);
+ }
try {
- Long id = dbstate.nextMessageId++;
- MessageAddBean bean = new MessageAddBean();
- bean.setBuffer(message.getBuffer());
- bean.setEncoding(message.getEncoding());
- bean.setMessageId(message.getMessageId());
- bean.setMessageKey(id);
- bean.setStreamKey(message.getStreamKey());
- store(bean);
- return id;
+ return (MessageRecord) load(location);
} catch (IOException e) {
throw new FatalStoreException(e);
}
}
- public Long messageGetKey(AsciiBuffer messageId) {
- return null;
- }
- public MessageRecord messageGetRecord(Long key) {
- return null;
- }
///////////////////////////////////////////////////////////////
// Queue related methods.
///////////////////////////////////////////////////////////////
public void queueAdd(AsciiBuffer queueName) {
- try {
- store(new QueueAddBean().setQueueName(queueName));
- } catch (IOException e) {
- throw new FatalStoreException(e);
- }
+ updates.add(new QueueAddBean().setQueueName(queueName));
}
- public boolean queueRemove(AsciiBuffer queueName) {
- try {
- store(new QueueRemoveBean().setQueueName(queueName));
- return false;
- } catch (IOException e) {
- throw new FatalStoreException(e);
- }
+ public void queueRemove(AsciiBuffer queueName) {
+ updates.add(new QueueRemoveBean().setQueueName(queueName));
}
public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
- return null;
+ return rootEntity.queueList(tx(), firstQueueName, max);
}
public Long queueAddMessage(AsciiBuffer queueName, QueueRecord record) throws KeyNotFoundException {
- try {
- Long queueKey = 1L;
- QueueAddMessageBean bean = new QueueAddMessageBean();
- bean.setQueueName(queueName);
- bean.setAttachment(record.getAttachment());
- bean.setMessageKey(record.getMessageKey());
- bean.setQueueKey(queueKey);
- store(bean);
- return queueKey;
- } catch (IOException e) {
- throw new FatalStoreException(e);
- }
+ DestinationEntity destination = rootEntity.getDestination(queueName);
+ if( destination ==null ) {
+ throw new KeyNotFoundException("queue key: "+queueName);
+ }
+ Long queueKey = destination.nextQueueKey();
+ QueueAddMessageBean bean = new QueueAddMessageBean();
+ bean.setQueueName(queueName);
+ bean.setAttachment(record.getAttachment());
+ bean.setMessageKey(record.getMessageKey());
+ bean.setQueueKey(queueKey);
+ updates.add(bean);
+ return queueKey;
}
public void queueRemoveMessage(AsciiBuffer queueName, Long queueKey) throws KeyNotFoundException {
+ QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
+ bean.setQueueKey(queueKey);
+ bean.setQueueName(queueName);
+ updates.add(bean);
+ }
+ public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
+ DestinationEntity destination = rootEntity.getDestination(queueName);
+ if( destination ==null ) {
+ throw new KeyNotFoundException("queue key: "+queueName);
+ }
try {
- QueueRemoveMessageBean bean = new QueueRemoveMessageBean();
- bean.setQueueKey(queueKey);
- bean.setQueueName(queueName);
- store(bean);
+ return destination.listMessages(tx, firstQueueKey, max);
} catch (IOException e) {
throw new FatalStoreException(e);
}
-
- }
- public Iterator<QueueRecord> queueListMessagesQueue(AsciiBuffer queueName, Long firstQueueKey, int max) throws KeyNotFoundException {
- return null;
}
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Thu Mar 26 16:47:24 2009
@@ -20,6 +20,7 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.kahadb.journal.Location;
@@ -27,6 +28,34 @@
public class Marshallers {
+ public final static Marshaller<QueueRecord> QUEUE_RECORD_MARSHALLER = new Marshaller<QueueRecord>() {
+
+ public Class<QueueRecord> getType() {
+ return QueueRecord.class;
+ }
+
+ public QueueRecord readPayload(DataInput dataIn) throws IOException {
+ QueueRecord rc = new QueueRecord();
+ rc.setQueueKey(dataIn.readLong());
+ rc.setMessageKey(dataIn.readLong());
+ if( dataIn.readBoolean() ) {
+ rc.setAttachment(BUFFER_MARSHALLER.readPayload(dataIn));
+ }
+ return rc;
+ }
+
+ public void writePayload(QueueRecord object, DataOutput dataOut) throws IOException {
+ dataOut.writeLong(object.getQueueKey());
+ dataOut.writeLong(object.getMessageKey());
+ if( object.getAttachment()!=null ) {
+ dataOut.writeBoolean(true);
+ BUFFER_MARSHALLER.writePayload(object.getAttachment(), dataOut);
+ } else {
+ dataOut.writeBoolean(false);
+ }
+ }
+ };
+
public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
public Class<Location> getType() {
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Thu Mar 26 16:47:24 2009
@@ -25,7 +25,6 @@
import org.apache.kahadb.util.Marshaller;
public class MessageKeys {
- public static final MessageKeysMarshaller MARSHALLER = new MessageKeysMarshaller();
final AsciiBuffer messageId;
final Location location;
@@ -40,8 +39,7 @@
return "["+messageId+","+location+"]";
}
- public static class MessageKeysMarshaller implements Marshaller<MessageKeys> {
-
+ public static final Marshaller<MessageKeys> MARSHALLER = new Marshaller<MessageKeys>() {
public Class<MessageKeys> getType() {
return MessageKeys.class;
}
@@ -56,5 +54,5 @@
dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
}
- }
+ };
}
\ No newline at end of file
Added: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=758737&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (added)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Thu Mar 26 16:47:24 2009
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.store.kahadb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.kahadb.index.BTreeIndex;
+import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.util.Marshaller;
+
+public class RootEntity {
+
+ public final static Marshaller<RootEntity> MARSHALLER = new Marshaller<RootEntity>() {
+ public Class<RootEntity> getType() {
+ return RootEntity.class;
+ }
+
+ public RootEntity readPayload(DataInput is) throws IOException {
+ RootEntity rc = new RootEntity();
+ rc.state = is.readInt();
+ rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
+ if (is.readBoolean()) {
+ rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
+ } else {
+ rc.lastUpdate = null;
+ }
+ return rc;
+ }
+
+ public void writePayload(RootEntity object, DataOutput os) throws IOException {
+ os.writeInt(object.state);
+ os.writeLong(object.destinationIndex.getPageId());
+ if (object.lastUpdate != null) {
+ os.writeBoolean(true);
+ Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+ } else {
+ os.writeBoolean(false);
+ }
+ }
+ };
+
+ // The root page the this object's state is stored on.
+ // private Page<StoredDBState> page;
+
+ // State information about the index
+ private long pageId;
+ private int state;
+ private Location lastUpdate;
+
+ // Message Indexes
+ private long nextMessageKey;
+ private BTreeIndex<Long, MessageKeys> messageKeyIndex;
+ private BTreeIndex<Location, Long> locationIndex;
+ private BTreeIndex<AsciiBuffer, Long> messageIdIndex;
+
+ // The destinations
+ private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
+ private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
+
+ ///////////////////////////////////////////////////////////////////
+ // Lifecycle Methods.
+ ///////////////////////////////////////////////////////////////////
+
+ public void allocate(Transaction tx) throws IOException {
+ // First time this is created.. Initialize a new pagefile.
+ Page<RootEntity> page = tx.allocate();
+ pageId = page.getPageId();
+ assert pageId == 0;
+
+ state = KahaDBStore.CLOSED_STATE;
+ destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
+
+ page.set(this);
+ tx.store(page, MARSHALLER, true);
+ }
+
+ public void load(Transaction tx) throws IOException {
+ destinationIndex.setPageFile(tx.getPageFile());
+ destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
+ destinationIndex.load(tx);
+
+ // Keep the StoredDestinations loaded
+ destinations.clear();
+ for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
+ Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
+ entry.getValue().load(tx);
+ destinations.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void store(Transaction tx) throws IOException {
+ Page<RootEntity> page = tx.load(pageId, null);
+ page.set(this);
+ tx.store(page, RootEntity.MARSHALLER, true);
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Message Methods.
+ ///////////////////////////////////////////////////////////////////
+ public Long nextMessageKey() {
+ return nextMessageKey++;
+ }
+
+ public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+ long id = nextMessageKey++;
+ Long previous = locationIndex.put(tx, location, id);
+ if( previous == null ) {
+ messageIdIndex.put(tx, command.getMessageId(), id);
+ messageKeyIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
+ } else {
+ // Message existed.. undo the index update we just did. Chances
+ // are it's a transaction replay.
+ locationIndex.put(tx, location, previous);
+ }
+ }
+
+ public Long messageGetKey(Transaction tx, AsciiBuffer messageId) {
+ try {
+ return messageIdIndex.get(tx, messageId);
+ } catch (IOException e) {
+ throw new Store.FatalStoreException(e);
+ }
+ }
+
+ public Location messageGetLocation(Transaction tx, Long messageKey) {
+ try {
+ MessageKeys t = messageKeyIndex.get(tx, messageKey);
+ if( t==null ) {
+ return null;
+ }
+ return t.location;
+ } catch (IOException e) {
+ throw new Store.FatalStoreException(e);
+ }
+ }
+
+ ///////////////////////////////////////////////////////////////////
+ // Queue Methods.
+ ///////////////////////////////////////////////////////////////////
+ public void queueAdd(Transaction tx, AsciiBuffer queueName) throws IOException {
+ if( destinationIndex.get(tx, queueName)==null ) {
+ DestinationEntity rc = new DestinationEntity();
+ rc.allocate(tx);
+ destinationIndex.put(tx, queueName, rc);
+ rc.load(tx);
+ destinations.put(queueName, rc);
+ }
+ }
+
+ public void queueRemove(Transaction tx, AsciiBuffer queueName) throws IOException {
+ DestinationEntity destination = destinations.get(queueName);
+ if( destination!=null ) {
+ destinationIndex.remove(tx, queueName);
+ destinations.remove(queueName);
+ destination.deallocate(tx);
+ }
+ }
+
+ public DestinationEntity getDestination(AsciiBuffer queueName) {
+ return destinations.get(queueName);
+ }
+
+ public Iterator<AsciiBuffer> queueList(Transaction tx, AsciiBuffer firstQueueName, int max) {
+ return list(destinations, firstQueueName, max);
+ }
+
+ static private <Key,Value> Iterator<Key> list(TreeMap<Key, Value> map, Key first, int max) {
+ ArrayList<Key> rc = new ArrayList<Key>(max);
+ Set<Key> keys = (first==null ? map : map.tailMap(first)).keySet();
+ for (Key buffer : keys) {
+ if( rc.size() >= max ) {
+ break;
+ }
+ rc.add(buffer);
+ }
+ return rc.iterator();
+ }
+
+ public long getPageId() {
+ return pageId;
+ }
+
+ public void setPageId(long pageId) {
+ this.pageId = pageId;
+ }
+
+ public int getState() {
+ return state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
+
+ public Location getLastUpdate() {
+ return lastUpdate;
+ }
+
+ public void setLastUpdate(Location lastUpdate) {
+ this.lastUpdate = lastUpdate;
+ }
+
+
+}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=758737&r1=758736&r2=758737&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Thu Mar 26 16:47:24 2009
@@ -175,13 +175,11 @@
queues.put(queueName, queue);
}
}
- public boolean queueRemove(AsciiBuffer queueName) {
+ public void queueRemove(AsciiBuffer queueName) {
StoredQueue queue = queues.get(queueName);
if (queue != null) {
queues.remove(queueName);
- return true;
}
- return false;
}
public Iterator<AsciiBuffer> queueList(AsciiBuffer firstQueueName, int max) {
return list(queues, firstQueueName, max);