You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:24:44 UTC
svn commit: r961062 [4/14] - in /activemq/sandbox/activemq-apollo-actor: ./
activemq-all/ activemq-all/src/test/ide-resources/
activemq-all/src/test/java/org/apache/activemq/jaxb/
activemq-all/src/test/java/org/apache/activemq/legacy/ activemq-all/src/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/DestinationEntity.java Wed Jul 7 03:24:02 2010
@@ -27,36 +27,50 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.DuplicateKeyException;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
-import org.apache.activemq.queue.QueueDescriptor;
-import org.apache.activemq.util.marshaller.LongMarshaller;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.BTreeIndexFactory;
+import org.fusesource.hawtdb.api.SortedIndex;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.util.marshaller.*;
public class DestinationEntity {
+ private static final BTreeIndexFactory<Long, QueueRecord> queueIndexFactory = new BTreeIndexFactory<Long, QueueRecord>();
+ private static final BTreeIndexFactory<Long, Long> trackingIndexFactory = new BTreeIndexFactory<Long, Long>();
+ private static final BTreeIndexFactory<Long, Long> statsIndexFactory = new BTreeIndexFactory<Long, Long>();
+
+ static {
+ queueIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+ queueIndexFactory.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
+ queueIndexFactory.setDeferredEncoding(true);
+
+ trackingIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+ trackingIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ trackingIndexFactory.setDeferredEncoding(true);
+
+ statsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+ statsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ statsIndexFactory.setDeferredEncoding(true);
+ }
+
public final static Marshaller<DestinationEntity> MARSHALLER = new VariableMarshaller<DestinationEntity>() {
public DestinationEntity readPayload(DataInput dataIn) throws IOException {
DestinationEntity value = new DestinationEntity();
- value.queueIndex = new BTreeIndex<Long, QueueRecord>(dataIn.readLong());
- value.trackingIndex = new BTreeIndex<Long, Long>(dataIn.readLong());
+ value.queueIndex = dataIn.readInt();
+ value.trackingIndex = dataIn.readInt();
value.descriptor = Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.readPayload(dataIn);
- value.metaData = new Page<DestinationMetaData>(dataIn.readLong());
return value;
}
public void writePayload(DestinationEntity value, DataOutput dataOut) throws IOException {
- dataOut.writeLong(value.queueIndex.getPageId());
- dataOut.writeLong(value.trackingIndex.getPageId());
+ dataOut.writeInt(value.queueIndex);
+ dataOut.writeInt(value.trackingIndex);
Marshallers.QUEUE_DESCRIPTOR_MARSHALLER.writePayload(value.descriptor, dataOut);
- dataOut.writeLong(value.metaData.getPageId());
}
public int estimatedSize(DestinationEntity object) {
@@ -65,30 +79,13 @@ public class DestinationEntity {
};
- public final static Marshaller<DestinationMetaData> META_DATA_MARSHALLER = new VariableMarshaller<DestinationMetaData>() {
- public DestinationMetaData readPayload(DataInput dataIn) throws IOException {
- DestinationMetaData value = new DestinationMetaData();
- value.count = dataIn.readInt();
- value.size = dataIn.readLong();
- return value;
- }
-
- public void writePayload(DestinationMetaData value, DataOutput dataOut) throws IOException {
- dataOut.writeInt(value.count);
- dataOut.writeLong(value.size);
- }
-
- public int estimatedSize(DestinationMetaData object) {
- throw new UnsupportedOperationException();
- }
- };
-
public Class<DestinationEntity> getType() {
return DestinationEntity.class;
}
- private BTreeIndex<Long, QueueRecord> queueIndex;
- private BTreeIndex<Long, Long> trackingIndex;
+ private int queueIndex;
+ private int trackingIndex;
+ private int statsIndex;
// Descriptor for this queue:
private QueueDescriptor descriptor;
@@ -96,78 +93,77 @@ public class DestinationEntity {
// Child Partitions:
private HashSet<DestinationEntity> partitions;
- // Holds volatile queue meta data
- private Page<DestinationMetaData> metaData;
-
// /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
// /////////////////////////////////////////////////////////////////
public void allocate(Transaction tx) throws IOException {
- queueIndex = new BTreeIndex<Long, QueueRecord>(tx.allocate());
- trackingIndex = new BTreeIndex<Long, Long>(tx.allocate());
- metaData = tx.allocate();
- metaData.set(new DestinationMetaData());
- tx.store(metaData, META_DATA_MARSHALLER, true);
- }
+ queueIndex = tx.alloc();
+ queueIndexFactory.create(tx, queueIndex);
- public void deallocate(Transaction tx) throws IOException {
- queueIndex.clear(tx);
- trackingIndex.clear(tx);
- tx.free(trackingIndex.getPageId());
- tx.free(queueIndex.getPageId());
- tx.free(metaData.getPageId());
- queueIndex = null;
- trackingIndex = null;
- metaData = null;
- }
+ trackingIndex = tx.alloc();
+ trackingIndexFactory.create(tx, trackingIndex);
- public void load(Transaction tx) throws IOException {
- if (queueIndex.getPageFile() == null) {
+ statsIndex = tx.alloc();
+ statsIndexFactory.create(tx, statsIndex);
+ setStats(tx, 0,0);
+ }
- queueIndex.setPageFile(tx.getPageFile());
- queueIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- queueIndex.setValueMarshaller(Marshallers.QUEUE_RECORD_MARSHALLER);
- queueIndex.load(tx);
- }
+ public void deallocate(Transaction tx) throws IOException {
+ queueIndex(tx).clear();
+ tx.free(trackingIndex);
- if (trackingIndex.getPageFile() == null) {
+ trackingIndex(tx).clear();
+ tx.free(queueIndex);
- trackingIndex.setPageFile(tx.getPageFile());
- trackingIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- trackingIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- trackingIndex.load(tx);
- }
+ statsIndex(tx).clear();
+ tx.free(statsIndex);
+ }
- tx.load(metaData, META_DATA_MARSHALLER);
+ private SortedIndex<Long, QueueRecord> queueIndex(Transaction tx) {
+ return queueIndexFactory.open(tx, queueIndex);
+ }
+ private SortedIndex<Long, Long> trackingIndex(Transaction tx) {
+ return trackingIndexFactory.open(tx, trackingIndex);
+ }
+ private SortedIndex<Long, Long> statsIndex(Transaction tx) {
+ return statsIndexFactory.open(tx, statsIndex);
}
private static final boolean unlimited(Number val) {
return val == null || val.longValue() < 0;
}
- private DestinationMetaData getMetaData(Transaction tx) throws IOException {
- tx.load(metaData, META_DATA_MARSHALLER);
- return metaData.get();
- }
-
// /////////////////////////////////////////////////////////////////
// Message Methods.
// /////////////////////////////////////////////////////////////////
-
+
+ public static final Long SIZE_STAT = 0L;
+ public static final Long COUNT_STAT = 0L;
+
public long getSize(Transaction tx) throws IOException {
- return getMetaData(tx).size;
+ return statsIndex(tx).get(SIZE_STAT);
}
public int getCount(Transaction tx) throws IOException {
- return getMetaData(tx).count;
+ return (int)(long)statsIndex(tx).get(COUNT_STAT);
}
public long getFirstSequence(Transaction tx) throws IOException {
- return getMetaData(tx).count == 0 ? 0 : queueIndex.getFirst(tx).getValue().getQueueKey();
+ Entry<Long, QueueRecord> entry = queueIndex(tx).getFirst();
+ if( entry!=null ) {
+ return entry.getValue().getQueueKey();
+ } else {
+ return 0;
+ }
}
public long getLastSequence(Transaction tx) throws IOException {
- return getMetaData(tx).count == 0 ? 0 : queueIndex.getLast(tx).getValue().getQueueKey();
+ Entry<Long, QueueRecord> entry = queueIndex(tx).getLast();
+ if( entry!=null ) {
+ return entry.getValue().getQueueKey();
+ } else {
+ return 0;
+ }
}
public void setQueueDescriptor(QueueDescriptor queue) {
@@ -207,7 +203,7 @@ public class DestinationEntity {
public void add(Transaction tx, QueueAddMessage command) throws IOException, DuplicateKeyException {
- Long existing = trackingIndex.put(tx, command.getMessageKey(), command.getQueueKey());
+ Long existing = trackingIndex(tx).put(command.getMessageKey(), command.getQueueKey());
if (existing == null) {
QueueRecord value = new QueueRecord();
value.setAttachment(command.getAttachment());
@@ -215,7 +211,7 @@ public class DestinationEntity {
value.setQueueKey(command.getQueueKey());
value.setSize(command.getMessageSize());
- QueueRecord rc = queueIndex.put(tx, value.getQueueKey(), value);
+ QueueRecord rc = queueIndex(tx).put(value.getQueueKey(), value);
if (rc == null) {
// TODO It seems a little inefficient to continually serialize
// the queue size. It might be better to update this only at
@@ -225,8 +221,7 @@ public class DestinationEntity {
// It is also possible that we might want to remove this update
// altogether in favor of scanning the whole queue at recovery
// time (at the cost of startup time)
- getMetaData(tx).update(1, command.getMessageSize());
- tx.store(metaData, META_DATA_MARSHALLER, true);
+ addStats(tx, 1, command.getMessageSize());
} else {
throw new Store.FatalStoreException(new Store.DuplicateKeyException("Duplicate sequence number " + command.getQueueKey() + " for " + descriptor.getQueueName()));
}
@@ -235,6 +230,18 @@ public class DestinationEntity {
}
}
+ private void addStats(Transaction tx, int count, int size) {
+ SortedIndex<Long, Long> index = statsIndex(tx);
+ index.put(COUNT_STAT, index.get(COUNT_STAT)+count);
+ index.put(SIZE_STAT, index.get(SIZE_STAT)+size);
+ }
+
+ private void setStats(Transaction tx, int count, int size) {
+ SortedIndex<Long, Long> index = statsIndex(tx);
+ index.put(COUNT_STAT, new Long(count));
+ index.put(SIZE_STAT, new Long(size));
+ }
+
/**
* Removes a queue record returning the corresponding element tracking number.
* @param tx The transaction under which to do the removal
@@ -243,12 +250,11 @@ public class DestinationEntity {
* @throws IOException
*/
public long remove(Transaction tx, long queueKey) throws IOException {
- QueueRecord qr = queueIndex.remove(tx, queueKey);
+ QueueRecord qr = queueIndex(tx).remove(queueKey);
if(qr != null)
{
- trackingIndex.remove(tx, qr.getMessageKey());
- getMetaData(tx).update(-1, -qr.getSize());
- tx.store(metaData, META_DATA_MARSHALLER, true);
+ trackingIndex(tx).remove(qr.getMessageKey());
+ addStats(tx, -1, -qr.getSize());
return qr.getMessageKey();
}
return -1;
@@ -264,10 +270,10 @@ public class DestinationEntity {
Iterator<Entry<Long, QueueRecord>> iterator;
if (unlimited(firstQueueKey)) {
- iterator = queueIndex.iterator(tx);
+ iterator = queueIndex(tx).iterator();
} else {
- iterator = queueIndex.iterator(tx, firstQueueKey);
+ iterator = queueIndex(tx).iterator(firstQueueKey);
}
boolean sequenceLimited = !unlimited(maxQueueKey);
boolean countLimited = !unlimited(max);
@@ -286,21 +292,8 @@ public class DestinationEntity {
}
public Iterator<Entry<Long, Long>> listTrackingNums(Transaction tx) throws IOException {
- return trackingIndex.iterator(tx);
+ return trackingIndex(tx).iterator();
}
- public static class DestinationMetaData {
- int count;
- long size;
- public void update(int count, long size) {
- this.count += count;
- this.size += size;
- }
-
- public void set(int count, long size) {
- this.count = count;
- this.size = size;
- }
- }
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Wed Jul 7 03:24:02 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.kahadb.Data.MapAdd;
import org.apache.activemq.broker.store.kahadb.Data.MapEntryPut;
@@ -57,7 +58,6 @@ import org.apache.activemq.broker.store.
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.activemq.protobuf.MessageBuffer;
import org.apache.activemq.protobuf.PBMessage;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
@@ -65,11 +65,11 @@ import org.apache.activemq.util.buffer.D
import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.journal.Journal;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.PageFile;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.api.TxPageFile;
+import org.fusesource.hawtdb.api.TxPageFileFactory;
+import org.fusesource.hawtdb.internal.journal.Journal;
+import org.fusesource.hawtdb.internal.journal.Location;
public class KahaDBStore implements Store {
@@ -81,15 +81,16 @@ public class KahaDBStore implements Stor
private static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
- private static final Buffer BEGIN_UNIT_OF_WORK_DATA = new Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
- private static final Buffer END_UNIT_OF_WORK_DATA = new Buffer(new byte[] { END_UNIT_OF_WORK });
- private static final Buffer CANCEL_UNIT_OF_WORK_DATA = new Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
- private static final Buffer FLUSH_DATA = new Buffer(new byte[] { FLUSH });
+ private static final org.fusesource.hawtdb.util.buffer.Buffer BEGIN_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { BEGIN_UNIT_OF_WORK });
+ private static final org.fusesource.hawtdb.util.buffer.Buffer END_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { END_UNIT_OF_WORK });
+ private static final org.fusesource.hawtdb.util.buffer.Buffer CANCEL_UNIT_OF_WORK_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { CANCEL_UNIT_OF_WORK });
+ private static final org.fusesource.hawtdb.util.buffer.Buffer FLUSH_DATA = new org.fusesource.hawtdb.util.buffer.Buffer(new byte[] { FLUSH });
public static final int CLOSED_STATE = 1;
public static final int OPEN_STATE = 2;
- protected PageFile pageFile;
+ protected TxPageFileFactory pageFileFactory = new TxPageFileFactory();
+ protected TxPageFile pageFile;
protected Journal journal;
protected RootEntity rootEntity = new RootEntity();
@@ -98,12 +99,9 @@ public class KahaDBStore implements Stor
protected boolean deleteAllMessages;
protected File directory;
protected Thread checkpointThread;
- protected boolean enableJournalDiskSyncs = true;
+
long checkpointInterval = 5 * 1000;
long cleanupInterval = 30 * 1000;
- int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
- boolean enableIndexWriteAsync = false;
- int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected AtomicBoolean started = new AtomicBoolean();
protected AtomicBoolean opened = new AtomicBoolean();
@@ -115,6 +113,7 @@ public class KahaDBStore implements Stor
protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
private boolean recovering;
+ private int journalMaxFileLength = 1024*1024*20;
private static class UoWOperation {
public TypeCreatable bean;
@@ -156,16 +155,11 @@ public class KahaDBStore implements Stor
private void loadPageFile() throws IOException {
indexLock.writeLock().lock();
try {
- final PageFile pageFile = getPageFile();
- pageFile.load();
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ final TxPageFile pageFile = getPageFile();
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
- if (pageFile.getPageCount() == 0) {
+ if ( !tx.allocator().isAllocated(0) ) {
rootEntity.allocate(tx);
- } else {
- Page<RootEntity> page = tx.load(0, RootEntity.MARSHALLER);
- rootEntity = page.get();
- rootEntity.setPageId(0);
}
rootEntity.load(tx);
}
@@ -177,6 +171,24 @@ public class KahaDBStore implements Stor
}
}
+ interface Closure<T extends Exception> {
+ public void execute(Transaction tx) throws T;
+ }
+
+ private <T extends Exception> void execute(Closure<T> closure) throws T {
+ Transaction tx = pageFile.tx();
+ boolean committed=false;
+ try {
+ closure.execute(tx);
+ tx.commit();
+ committed=true;
+ } finally {
+ if( !committed ) {
+ tx.rollback();
+ }
+ }
+ }
+
/**
* @throws IOException
*/
@@ -210,7 +222,7 @@ public class KahaDBStore implements Stor
journal.delete();
journal.close();
journal = null;
- getPageFile().delete();
+ pageFileFactory.getFile().delete();
rootEntity = new RootEntity();
LOG.info("Persistence store purged.");
deleteAllMessages = false;
@@ -270,7 +282,8 @@ public class KahaDBStore implements Stor
indexLock.writeLock().lock();
try {
- pageFile.unload();
+ pageFileFactory.close();
+ pageFile = null;
rootEntity = new RootEntity();
journal.close();
} finally {
@@ -284,11 +297,11 @@ public class KahaDBStore implements Stor
}
public void unload() throws IOException, InterruptedException {
- if (pageFile.isLoaded()) {
+ if (pageFile !=null) {
indexLock.writeLock().lock();
try {
rootEntity.setState(CLOSED_STATE);
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
// Set the last update to the next update (otherwise
// we'll replay the last update
@@ -315,7 +328,6 @@ public class KahaDBStore implements Stor
*
* @throws IOException
* @throws IOException
- * @throws InvalidLocationException
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
@@ -330,7 +342,7 @@ public class KahaDBStore implements Stor
int uowCounter = 0;
while (recoveryPosition != null) {
- Buffer data = journal.read(recoveryPosition);
+ Buffer data = convert(journal.read(recoveryPosition));
if (data.length == 1 && data.data[0] == BEGIN_UNIT_OF_WORK) {
uow = pageFile.tx();
} else if (data.length == 1 && data.data[0] == END_UNIT_OF_WORK) {
@@ -352,7 +364,7 @@ public class KahaDBStore implements Stor
updateIndex(uow, message.toType(), (MessageBuffer) message, location);
uowCounter++;
} else {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, message.toType(), (MessageBuffer) message, location);
rootEntity.setLastUpdate(location);
@@ -369,7 +381,7 @@ public class KahaDBStore implements Stor
}
// We may have to undo some index updates.
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
recoverIndex(tx);
}
@@ -380,6 +392,10 @@ public class KahaDBStore implements Stor
}
}
+ private Buffer convert(org.fusesource.hawtdb.util.buffer.Buffer buffer) {
+ return new Buffer(buffer.data, buffer.offset, buffer.length);
+ }
+
public void incrementalRecover() throws IOException {
indexLock.writeLock().lock();
try {
@@ -397,7 +413,7 @@ public class KahaDBStore implements Stor
final TypeCreatable message = load(lastRecoveryPosition);
final Location location = lastRecoveryPosition;
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, message.toType(), (MessageBuffer) message, location);
}
@@ -449,7 +465,7 @@ public class KahaDBStore implements Stor
if (!opened.get()) {
return;
}
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
@@ -473,7 +489,7 @@ public class KahaDBStore implements Stor
public void checkpoint(org.apache.activemq.util.Callback closure) throws Exception {
indexLock.writeLock().lock();
try {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, false);
}
@@ -565,12 +581,12 @@ public class KahaDBStore implements Stor
long start = System.currentTimeMillis();
final Location location;
synchronized (journal) {
- location = journal.write(os.toBuffer(), onFlush);
+ location = journal.write(convert(os.toBuffer()), onFlush);
}
long start2 = System.currentTimeMillis();
if (tx == null) {
- pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ execute(new Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
updateIndex(tx, data.toType(), message, location);
}
@@ -592,6 +608,10 @@ public class KahaDBStore implements Stor
}
+ private org.fusesource.hawtdb.util.buffer.Buffer convert(Buffer buffer) {
+ return new org.fusesource.hawtdb.util.buffer.Buffer(buffer.data, buffer.offset, buffer.length);
+ }
+
/**
* Loads a previously stored PBMessage
*
@@ -600,7 +620,7 @@ public class KahaDBStore implements Stor
* @throws IOException
*/
private TypeCreatable load(Location location) throws IOException {
- Buffer data = journal.read(location);
+ Buffer data = convert(journal.read(location));
return load(location, data);
}
@@ -636,10 +656,10 @@ public class KahaDBStore implements Stor
queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
break;
case SUBSCRIPTION_ADD:
- rootEntity.addSubscription(tx, (SubscriptionAdd) command);
+ rootEntity.addSubscription((SubscriptionAdd) command);
break;
case SUBSCRIPTION_REMOVE:
- rootEntity.removeSubscription(tx, ((SubscriptionRemove) command).getName());
+ rootEntity.removeSubscription(((SubscriptionRemove) command).getName());
break;
case TRANSACTION_BEGIN:
case TRANSACTION_ADD_MESSAGE:
@@ -977,7 +997,7 @@ public class KahaDBStore implements Stor
storeAtomic();
SubscriptionRecord old;
try {
- old = rootEntity.getSubscription(tx, record.getName());
+ old = rootEntity.getSubscription(record.getName());
if (old != null && !old.equals(record)) {
throw new DuplicateKeyException("Subscription already exists: " + record.getName());
} else {
@@ -1175,30 +1195,23 @@ public class KahaDBStore implements Stor
// IoC Properties.
// /////////////////////////////////////////////////////////////////
- protected PageFile createPageFile() {
- PageFile index = new PageFile(directory, "db");
- index.setEnableWriteThread(isEnableIndexWriteAsync());
- index.setWriteBatchSize(getIndexWriteBatchSize());
- return index;
- }
-
- protected Journal createJournal() {
- Journal manager = new Journal();
- manager.setDirectory(directory);
- manager.setMaxFileLength(getJournalMaxFileLength());
- return manager;
- }
-
- private PageFile getPageFile() {
+ private TxPageFile getPageFile() {
if (pageFile == null) {
- pageFile = createPageFile();
+ pageFileFactory.setFile(new File(directory, "db"));
+ pageFileFactory.setDrainOnClose(false);
+ pageFileFactory.setSync(true);
+ pageFileFactory.setUseWorkerThread(true);
+ pageFileFactory.open();
+ pageFile = pageFileFactory.getTxPageFile();
}
return pageFile;
}
private Journal getJournal() {
if (journal == null) {
- journal = createJournal();
+ journal = new Journal();
+ journal.setDirectory(directory);
+ journal.setMaxFileLength(getJournalMaxFileLength());
}
return journal;
}
@@ -1223,30 +1236,6 @@ public class KahaDBStore implements Stor
this.deleteAllMessages = deleteAllMessages;
}
- public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
- this.setIndexWriteBatchSize = setIndexWriteBatchSize;
- }
-
- public int getIndexWriteBatchSize() {
- return setIndexWriteBatchSize;
- }
-
- public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
- this.enableIndexWriteAsync = enableIndexWriteAsync;
- }
-
- boolean isEnableIndexWriteAsync() {
- return enableIndexWriteAsync;
- }
-
- public boolean isEnableJournalDiskSyncs() {
- return enableJournalDiskSyncs;
- }
-
- public void setEnableJournalDiskSyncs(boolean syncWrites) {
- this.enableJournalDiskSyncs = syncWrites;
- }
-
public long getCheckpointInterval() {
return checkpointInterval;
}
@@ -1263,20 +1252,39 @@ public class KahaDBStore implements Stor
this.cleanupInterval = cleanupInterval;
}
- public void setJournalMaxFileLength(int journalMaxFileLength) {
- this.journalMaxFileLength = journalMaxFileLength;
+ public boolean isFailIfDatabaseIsLocked() {
+ return failIfDatabaseIsLocked;
+ }
+
+ public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+ this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
}
public int getJournalMaxFileLength() {
return journalMaxFileLength;
}
+ public void setJournalMaxFileLength(int journalMaxFileLength) {
+ this.journalMaxFileLength = journalMaxFileLength;
+ }
- public boolean isFailIfDatabaseIsLocked() {
- return failIfDatabaseIsLocked;
+ public int getIndexMaxPages() {
+ return pageFileFactory.getMaxPages();
+ }
+ public void setIndexMaxPages(int maxPages) {
+ pageFileFactory.setMaxPages(maxPages);
}
- public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
- this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+ public short getIndexPageSize() {
+ return pageFileFactory.getPageSize();
+ }
+ public void setIndexPageSize(short pageSize) {
+ pageFileFactory.setPageSize(pageSize);
}
+ public int getIndexMappingSegementSize() {
+ return pageFileFactory.getMappingSegementSize();
+ }
+ public void setIndexMappingSegementSize(int mappingSegementSize) {
+ pageFileFactory.setMappingSegementSize(mappingSegementSize);
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/Marshallers.java Wed Jul 7 03:24:02 2010
@@ -20,13 +20,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store.QueueRecord;
-import org.apache.activemq.queue.QueueDescriptor;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.journal.Location;
+import org.fusesource.hawtdb.util.marshaller.Marshaller;
+import org.fusesource.hawtdb.util.marshaller.VariableMarshaller;
public class Marshallers {
@@ -71,74 +70,6 @@ public class Marshallers {
}
};
- public final static Marshaller<Location> LOCATION_MARSHALLER = new Marshaller<Location>() {
-
- public Location readPayload(DataInput dataIn) throws IOException {
- Location rc = new Location();
- rc.setDataFileId(dataIn.readInt());
- rc.setOffset(dataIn.readInt());
- return rc;
- }
-
- public void writePayload(Location object, DataOutput dataOut) throws IOException {
- dataOut.writeInt(object.getDataFileId());
- dataOut.writeInt(object.getOffset());
- }
-
- public boolean isDeepCopySupported() {
- return true;
- }
-
- public Location deepCopy(Location source) {
- return new Location(source);
- }
-
- public int getFixedSize() {
- return 8;
- }
-
- public int estimatedSize(Location object) {
- throw new UnsupportedOperationException();
- }
-
- };
-
- public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new VariableMarshaller<AsciiBuffer>() {
-
- public AsciiBuffer readPayload(DataInput dataIn) throws IOException {
- byte data[] = new byte[dataIn.readShort()];
- dataIn.readFully(data);
- return new AsciiBuffer(data);
- }
-
- public void writePayload(AsciiBuffer object, DataOutput dataOut) throws IOException {
- dataOut.writeShort(object.length);
- dataOut.write(object.data, object.offset, object.length);
- }
-
- public int estimatedSize(AsciiBuffer object) {
- throw new UnsupportedOperationException();
- }
- };
-
- public final static Marshaller<Buffer> BUFFER_MARSHALLER = new VariableMarshaller<Buffer>() {
-
- public Buffer readPayload(DataInput dataIn) throws IOException {
- byte data[] = new byte[dataIn.readShort()];
- dataIn.readFully(data);
- return new Buffer(data);
- }
-
- public void writePayload(Buffer object, DataOutput dataOut) throws IOException {
- dataOut.writeShort(object.length);
- dataOut.write(object.data, object.offset, object.length);
- }
-
- public int estimatedSize(Buffer object) {
- throw new UnsupportedOperationException();
- }
- };
-
public final static Marshaller<QueueDescriptor> QUEUE_DESCRIPTOR_MARSHALLER = new VariableMarshaller<QueueDescriptor>() {
public QueueDescriptor readPayload(DataInput dataIn) throws IOException {
@@ -169,4 +100,52 @@ public class Marshallers {
throw new UnsupportedOperationException();
}
};
+
+
+
+ static abstract public class AbstractBufferMarshaller<T extends Buffer> extends org.fusesource.hawtdb.util.marshaller.VariableMarshaller<T> {
+
+ public void writePayload(T value, DataOutput dataOut) throws IOException {
+ dataOut.writeInt(value.length);
+ dataOut.write(value.data, value.offset, value.length);
+ }
+
+ public T readPayload(DataInput dataIn) throws IOException {
+ int size = dataIn.readInt();
+ byte[] data = new byte[size];
+ dataIn.readFully(data);
+ return createBuffer(data);
+ }
+
+ abstract protected T createBuffer(byte [] data);
+
+ public T deepCopy(T source) {
+ return createBuffer(source.deepCopy().data);
+ }
+
+ public boolean isDeepCopySupported() {
+ return true;
+ }
+
+ public int estimatedSize(T object) {
+ return object.length+4;
+ }
+
+ }
+
+ public final static Marshaller<AsciiBuffer> ASCII_BUFFER_MARSHALLER = new AbstractBufferMarshaller<AsciiBuffer>() {
+ @Override
+ protected AsciiBuffer createBuffer(byte[] data) {
+ return new AsciiBuffer(data);
+ }
+
+ };
+
+ public final static Marshaller<Buffer> BUFFER_MARSHALLER = new AbstractBufferMarshaller<Buffer>() {
+ @Override
+ protected Buffer createBuffer(byte[] data) {
+ return new Buffer(data);
+ }
+ };
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/MessageKeys.java Wed Jul 7 03:24:02 2010
@@ -23,7 +23,8 @@ import java.io.IOException;
import org.apache.activemq.util.buffer.AsciiBuffer;
import org.apache.activemq.util.marshaller.Marshaller;
import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.journal.Location;
+import org.fusesource.hawtdb.internal.journal.Location;
+import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
public class MessageKeys {
@@ -42,14 +43,14 @@ public class MessageKeys {
public static final Marshaller<MessageKeys> MARSHALLER = new VariableMarshaller<MessageKeys>() {
public MessageKeys readPayload(DataInput dataIn) throws IOException {
- Location location = Marshallers.LOCATION_MARSHALLER.readPayload(dataIn);
+ Location location = LocationMarshaller.INSTANCE.readPayload(dataIn);
byte data[] = new byte[dataIn.readShort()];
dataIn.readFully(data);
return new MessageKeys(new AsciiBuffer(data), location);
}
public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
- Marshallers.LOCATION_MARSHALLER.writePayload(object.location, dataOut);
+ LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
dataOut.writeShort(object.messageId.length);
dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Wed Jul 7 03:24:02 2010
@@ -16,18 +16,11 @@
*/
package org.apache.activemq.broker.store.kahadb;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.io.*;
+import java.util.*;
import java.util.Map.Entry;
+import org.apache.activemq.broker.store.QueueDescriptor;
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.KeyNotFoundException;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
@@ -36,97 +29,137 @@ import org.apache.activemq.broker.store.
import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBuffer;
import org.apache.activemq.protobuf.InvalidProtocolBufferException;
-import org.apache.activemq.queue.QueueDescriptor;
-import org.apache.activemq.util.buffer.AsciiBuffer;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.marshaller.IntegerMarshaller;
-import org.apache.activemq.util.marshaller.LongMarshaller;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
-import org.apache.kahadb.index.BTreeIndex;
-import org.apache.kahadb.index.BTreeVisitor;
-import org.apache.kahadb.journal.Location;
-import org.apache.kahadb.page.Page;
-import org.apache.kahadb.page.Transaction;
+import org.fusesource.hawtdb.api.*;
+import org.fusesource.hawtdb.internal.journal.Location;
+import org.apache.activemq.util.buffer.*;
+import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
+import org.fusesource.hawtdb.util.marshaller.IntegerMarshaller;
+import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
public class RootEntity {
- //TODO remove this one performance testing is complete.
+ //TODO remove this once performance testing is complete.
private static final boolean USE_LOC_INDEX = true;
-
private static final int VERSION = 0;
- public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>() {
- public RootEntity readPayload(DataInput is) throws IOException {
- RootEntity rc = new RootEntity();
- rc.state = is.readInt();
- is.readInt(); //VERSION
- rc.maxMessageKey = is.readLong();
- rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
+
+ private static final BTreeIndexFactory<Long, Location> messageKeyIndexFactory = new BTreeIndexFactory<Long, Location>();
+ private static final BTreeIndexFactory<Integer, Long> locationIndexFactory = new BTreeIndexFactory<Integer, Long>();
+ private static final BTreeIndexFactory<Long, Long> messageRefsIndexFactory = new BTreeIndexFactory<Long, Long>();
+ private static final BTreeIndexFactory<AsciiBuffer, DestinationEntity> destinationIndexFactory = new BTreeIndexFactory<AsciiBuffer, DestinationEntity>();
+ private static final BTreeIndexFactory<AsciiBuffer, Buffer> subscriptionIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
+ private static final BTreeIndexFactory<AsciiBuffer, Integer> mapIndexFactory = new BTreeIndexFactory<AsciiBuffer, Integer>();
+ private static final BTreeIndexFactory<AsciiBuffer, Buffer> mapInstanceIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
+
+ static {
+ messageKeyIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+ messageKeyIndexFactory.setValueMarshaller(LocationMarshaller.INSTANCE);
+ messageKeyIndexFactory.setDeferredEncoding(true);
+
+ locationIndexFactory.setKeyMarshaller(IntegerMarshaller.INSTANCE);
+ locationIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ locationIndexFactory.setDeferredEncoding(true);
+
+ messageRefsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
+ messageRefsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ messageRefsIndexFactory.setDeferredEncoding(true);
+
+ destinationIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ destinationIndexFactory.setValueMarshaller(DestinationEntity.MARSHALLER);
+ destinationIndexFactory.setDeferredEncoding(true);
+
+ subscriptionIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ subscriptionIndexFactory.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+ subscriptionIndexFactory.setDeferredEncoding(true);
+
+ mapIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ mapIndexFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
+ mapIndexFactory.setDeferredEncoding(true);
+ }
+
+ // The root page the this object's state is stored on.
+ // private Page<StoredDBState> page;
+
+ // State information about the index
+ Data data;
+ private long maxMessageKey;
+
+ static class Data {
+ private int state;
+ // Message Indexes
+ private long maxMessageKey;
+ private Location lastUpdate;
+
+ private SortedIndex<Long, Location> messageKeyIndex;
+ private SortedIndex<Integer, Long> locationIndex;
+ private SortedIndex<Long, Long> messageRefsIndex; // Maps message key to ref
+ // count:
+
+ // The destinations
+ private SortedIndex<AsciiBuffer, DestinationEntity> destinationIndex;
+
+ // Subscriptions
+ private SortedIndex<AsciiBuffer, Buffer> subscriptionIndex;
+
+ // Maps:
+ private SortedIndex<AsciiBuffer, Integer> mapIndex;
+
+ public void create(Transaction tx) {
+ state = KahaDBStore.CLOSED_STATE;
+ messageKeyIndex = messageKeyIndexFactory.create(tx, tx.alloc());
if (USE_LOC_INDEX)
- rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
- rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
- rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
- rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
- rc.mapIndex = new BTreeIndex<AsciiBuffer, Long>(is.readLong());
- if (is.readBoolean()) {
- rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
- } else {
- rc.lastUpdate = null;
- }
- return rc;
+ locationIndex = locationIndexFactory.create(tx, tx.alloc());
+ destinationIndex = destinationIndexFactory.create(tx, tx.alloc());
+ messageRefsIndex = messageRefsIndexFactory.create(tx, tx.alloc());
+ subscriptionIndex = subscriptionIndexFactory.create(tx, tx.alloc());
+ mapIndex = mapIndexFactory.create(tx, tx.alloc());
+
}
+ }
- public void writePayload(RootEntity object, DataOutput os) throws IOException {
+ EncoderDecoder<Data> DATA_ENCODER_DECODER = new AbstractStreamEncoderDecoder<Data>() {
+ @Override
+ protected void encode(Paged paged, DataOutputStream os, Data object) throws IOException {
os.writeInt(object.state);
os.writeInt(VERSION);
os.writeLong(object.maxMessageKey);
- os.writeLong(object.messageKeyIndex.getPageId());
+ os.writeInt(object.messageKeyIndex.getPage());
if (USE_LOC_INDEX)
- os.writeLong(object.locationIndex.getPageId());
- os.writeLong(object.destinationIndex.getPageId());
- os.writeLong(object.messageRefsIndex.getPageId());
- os.writeLong(object.subscriptionIndex.getPageId());
- os.writeLong(object.mapIndex.getPageId());
+ os.writeInt(object.locationIndex.getPage());
+ os.writeInt(object.destinationIndex.getPage());
+ os.writeInt(object.messageRefsIndex.getPage());
+ os.writeInt(object.subscriptionIndex.getPage());
+ os.writeInt(object.mapIndex.getPage());
if (object.lastUpdate != null) {
os.writeBoolean(true);
- Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
+ LocationMarshaller.INSTANCE.writePayload(object.lastUpdate, os);
} else {
os.writeBoolean(false);
}
}
- public int estimatedSize(RootEntity object) {
- throw new UnsupportedOperationException();
+ @Override
+ protected RootEntity.Data decode(Paged paged, DataInputStream is) throws IOException {
+ Data rc = new Data();
+ rc.state = is.readInt();
+ is.readInt(); //VERSION
+ rc.maxMessageKey = is.readLong();
+ rc.messageKeyIndex = messageKeyIndexFactory.open(paged, is.readInt());
+ if (USE_LOC_INDEX)
+ rc.locationIndex = locationIndexFactory.open(paged, is.readInt());
+ rc.destinationIndex = destinationIndexFactory.open(paged, is.readInt());
+ rc.messageRefsIndex = messageRefsIndexFactory.open(paged, is.readInt());
+ rc.subscriptionIndex = subscriptionIndexFactory.open(paged, is.readInt());
+ rc.mapIndex = mapIndexFactory.open(paged, is.readInt());
+ if (is.readBoolean()) {
+ rc.lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+ } else {
+ rc.lastUpdate = null;
+ }
+ return rc;
}
};
- // The root page the this object's state is stored on.
- // private Page<StoredDBState> page;
-
- // State information about the index
- private long pageId;
- private int state;
- private Location lastUpdate;
- private boolean loaded;
-
- // Message Indexes
- private long maxMessageKey;
- private BTreeIndex<Long, Location> messageKeyIndex;
- private BTreeIndex<Integer, Long> locationIndex;
- private BTreeIndex<Long, Long> messageRefsIndex; // Maps message key to ref
- // count:
-
- // The destinations
- private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
- private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
-
- // Subscriptions
- private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
-
- // Maps:
- private BTreeIndex<AsciiBuffer, Long> mapIndex;
- private TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer, Buffer>> mapCache = new TreeMap<AsciiBuffer, BTreeIndex<AsciiBuffer,Buffer>>();
// /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
@@ -134,71 +167,25 @@ public class RootEntity {
public void allocate(Transaction tx) throws IOException {
// First time this is created.. Initialize a new pagefile.
- Page<RootEntity> page = tx.allocate();
- pageId = page.getPageId();
+ int pageId = tx.alloc();
assert pageId == 0;
-
- state = KahaDBStore.CLOSED_STATE;
-
- messageKeyIndex = new BTreeIndex<Long, Location>(tx.getPageFile(), tx.allocate().getPageId());
- if (USE_LOC_INDEX)
- locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
- destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
- messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
- subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
- mapIndex = new BTreeIndex<AsciiBuffer, Long>(tx.getPageFile(), tx.allocate().getPageId());
-
- page.set(this);
- tx.store(page, MARSHALLER, true);
+ data = new Data();
+ data.create(tx);
+ tx.put(DATA_ENCODER_DECODER, pageId, data);
}
public void load(Transaction tx) throws IOException {
- messageKeyIndex.setPageFile(tx.getPageFile());
- messageKeyIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- messageKeyIndex.setValueMarshaller(Marshallers.LOCATION_MARSHALLER);
- messageKeyIndex.load(tx);
+ data = tx.get(DATA_ENCODER_DECODER, 0);
+
// Update max message key:
- Entry<Long, Location> last = messageKeyIndex.getLast(tx);
+ maxMessageKey = data.maxMessageKey;
+ Entry<Long, Location> last = data.messageKeyIndex.getLast();
if (last != null) {
if (last.getKey() > maxMessageKey) {
maxMessageKey = last.getKey();
}
}
- if (USE_LOC_INDEX) {
- locationIndex.setPageFile(tx.getPageFile());
- locationIndex.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- locationIndex.load(tx);
- }
-
- subscriptionIndex.setPageFile(tx.getPageFile());
- subscriptionIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- subscriptionIndex.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
- subscriptionIndex.load(tx);
-
- destinationIndex.setPageFile(tx.getPageFile());
- destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
- destinationIndex.load(tx);
-
- messageRefsIndex.setPageFile(tx.getPageFile());
- messageRefsIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
- messageRefsIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- messageRefsIndex.load(tx);
-
- // Keep the StoredDestinations loaded
- destinations.clear();
- for (Iterator<Entry<AsciiBuffer, DestinationEntity>> iterator = destinationIndex.iterator(tx); iterator.hasNext();) {
- Entry<AsciiBuffer, DestinationEntity> entry = iterator.next();
- entry.getValue().load(tx);
- try {
- addToDestinationCache(entry.getValue());
- } catch (KeyNotFoundException e) {
- //
- }
- }
-
// Build up the queue partition hierarchy:
try {
constructQueueHierarchy();
@@ -207,61 +194,6 @@ public class RootEntity {
ioe.initCause(e);
throw ioe;
}
-
- //Load Maps:
- mapIndex.setPageFile(tx.getPageFile());
- mapIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- mapIndex.setValueMarshaller(LongMarshaller.INSTANCE);
- mapIndex.load(tx);
-
- //Load all of the maps and cache them:
- for (Iterator<Entry<AsciiBuffer, Long>> iterator = mapIndex.iterator(tx); iterator.hasNext();) {
- Entry<AsciiBuffer, Long> entry = iterator.next();
- BTreeIndex<AsciiBuffer, Buffer> map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), entry.getValue());
- map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
- map.load(tx);
- mapCache.put(entry.getKey(), map);
- }
-
- }
-
- /**
- * Adds the destination to the destination cache
- *
- * @param entity
- * The destination to cache.
- * @throws KeyNotFoundException
- * If the parent queue could not be found.
- */
- private void addToDestinationCache(DestinationEntity entity) throws KeyNotFoundException {
- QueueDescriptor queue = entity.getDescriptor();
-
- // If loaded add a reference to us from the parent:
- if (loaded) {
- if (queue.getParent() != null) {
- DestinationEntity parent = destinations.get(queue.getParent());
- if (parent == null) {
- throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
- }
- parent.addPartition(entity);
- }
- }
-
- destinations.put(queue.getQueueName(), entity);
- }
-
- private void removeFromDestinationCache(DestinationEntity entity) {
- QueueDescriptor queue = entity.getDescriptor();
-
- // If the queue is loaded remove the parent reference:
- if (loaded) {
- if (queue.getParent() != null) {
- DestinationEntity parent = destinations.get(queue.getParent());
- parent.removePartition(entity);
- }
- }
- destinations.remove(queue.getQueueName());
}
/**
@@ -270,10 +202,11 @@ public class RootEntity {
* @throws KeyNotFoundException
*/
private void constructQueueHierarchy() throws KeyNotFoundException {
- for (DestinationEntity destination : destinations.values()) {
+ for (Entry<AsciiBuffer, DestinationEntity> entry : data.destinationIndex) {
+ DestinationEntity destination = entry.getValue();
QueueDescriptor queue = destination.getDescriptor();
if (queue.getParent() != null) {
- DestinationEntity parent = destinations.get(queue.getParent());
+ DestinationEntity parent = data.destinationIndex.get(queue.getParent());
if (parent == null) {
throw new KeyNotFoundException("Parent queue for " + queue.getQueueName() + " not found");
} else {
@@ -283,10 +216,10 @@ public class RootEntity {
}
}
+ @Deprecated // TODO: keep data immutable
public void store(Transaction tx) throws IOException {
- Page<RootEntity> page = tx.load(pageId, null);
- page.set(this);
- tx.store(page, RootEntity.MARSHALLER, true);
+ // TODO: need ot make Data immutable..
+ tx.put(DATA_ENCODER_DECODER, 0, data);
}
// /////////////////////////////////////////////////////////////////
@@ -301,55 +234,51 @@ public class RootEntity {
if (id > maxMessageKey) {
maxMessageKey = id;
}
- Location previous = messageKeyIndex.put(tx, id, location);
+ Location previous = data.messageKeyIndex.put(id, location);
if (previous != null) {
// Message existed.. undo the index update we just did. Chances
// are it's a transaction replay.
- messageKeyIndex.put(tx, id, previous);
+ data.messageKeyIndex.put(id, previous);
} else {
if (USE_LOC_INDEX) {
- Long refs = locationIndex.get(tx, location.getDataFileId());
+ Long refs = data.locationIndex.get(location.getDataFileId());
if (refs == null) {
- locationIndex.put(tx, location.getDataFileId(), new Long(1));
+ data.locationIndex.put(location.getDataFileId(), new Long(1));
} else {
- locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() + 1));
+ data.locationIndex.put(location.getDataFileId(), new Long(refs.longValue() + 1));
}
}
}
}
- public void messageRemove(Transaction tx, Long messageKey) throws IOException {
+ public void messageRemove(Long messageKey) {
// Location location = messageKeyIndex.remove(tx, messageKey);
- Location location = messageKeyIndex.remove(tx, messageKey);
+ Location location = data.messageKeyIndex.remove(messageKey);
if (USE_LOC_INDEX && location != null) {
- Long refs = locationIndex.get(tx, location.getDataFileId());
+ Long refs = data.locationIndex.get(location.getDataFileId());
if (refs != null) {
if (refs.longValue() <= 1) {
- locationIndex.remove(tx, location.getDataFileId());
+ data.locationIndex.remove(location.getDataFileId());
} else {
- locationIndex.put(tx, location.getDataFileId(), new Long(refs.longValue() - 1));
+ data.locationIndex.put(location.getDataFileId(), new Long(refs.longValue() - 1));
}
}
}
}
public Location messageGetLocation(Transaction tx, Long messageKey) {
- try {
- return messageKeyIndex.get(tx, messageKey);
- } catch (IOException e) {
- throw new Store.FatalStoreException(e);
- }
+ return data.messageKeyIndex.get(messageKey);
}
public void addMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
try {
- Long refs = messageRefsIndex.get(tx, messageKey);
+ Long refs = data.messageRefsIndex.get(messageKey);
if (refs == null) {
- messageRefsIndex.put(tx, messageKey, new Long(1));
+ data.messageRefsIndex.put(messageKey, new Long(1));
} else {
- messageRefsIndex.put(tx, messageKey, new Long(1 + refs.longValue()));
+ data.messageRefsIndex.put(messageKey, new Long(1 + refs.longValue()));
}
- } catch (IOException e) {
+ } catch (RuntimeException e) {
throw new Store.FatalStoreException(e);
}
@@ -357,17 +286,17 @@ public class RootEntity {
public void removeMessageRef(Transaction tx, AsciiBuffer queueName, Long messageKey) {
try {
- Long refs = messageRefsIndex.get(tx, messageKey);
+ Long refs = data.messageRefsIndex.get(messageKey);
if (refs != null) {
if (refs.longValue() <= 1) {
- messageRefsIndex.remove(tx, messageKey);
+ data.messageRefsIndex.remove(messageKey);
// If this is the last record remove, the message
- messageRemove(tx, messageKey);
+ messageRemove(messageKey);
} else {
- messageRefsIndex.put(tx, messageKey, new Long(refs.longValue() - 1));
+ data.messageRefsIndex.put(messageKey, new Long(refs.longValue() - 1));
}
}
- } catch (IOException e) {
+ } catch (RuntimeException e) {
throw new Store.FatalStoreException(e);
}
}
@@ -388,7 +317,7 @@ public class RootEntity {
final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
- subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
+ data.subscriptionIndex.visit(new IndexVisitor<AsciiBuffer, Buffer>() {
public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
return true;
}
@@ -412,21 +341,18 @@ public class RootEntity {
}
/**
- * @param tx
* @param name
* @throws IOException
*/
- public void removeSubscription(Transaction tx, AsciiBuffer name) throws IOException {
- subscriptionIndex.remove(tx, name);
+ public void removeSubscription(AsciiBuffer name) throws IOException {
+ data.subscriptionIndex.remove(name);
}
/**
- * @param tx
- * @param name
* @throws IOException
*/
- public void addSubscription(Transaction tx, SubscriptionAdd subscription) throws IOException {
- subscriptionIndex.put(tx, subscription.getName(), subscription.freeze().toFramedBuffer());
+ public void addSubscription(SubscriptionAdd subscription) throws IOException {
+ data.subscriptionIndex.put(subscription.getName(), subscription.freeze().toFramedBuffer());
}
/**
@@ -434,8 +360,8 @@ public class RootEntity {
* @return
* @throws IOException
*/
- public SubscriptionRecord getSubscription(Transaction tx, AsciiBuffer name) throws IOException {
- return toSubscriptionRecord(subscriptionIndex.get(tx, name));
+ public SubscriptionRecord getSubscription(AsciiBuffer name) throws IOException {
+ return toSubscriptionRecord(data.subscriptionIndex.get(name));
}
/**
@@ -475,22 +401,16 @@ public class RootEntity {
// Queue Methods.
// /////////////////////////////////////////////////////////////////
public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
- if (destinationIndex.get(tx, queue.getQueueName()) == null) {
+ if (data.destinationIndex.get(queue.getQueueName()) == null) {
DestinationEntity rc = new DestinationEntity();
rc.setQueueDescriptor(queue);
rc.allocate(tx);
- destinationIndex.put(tx, queue.getQueueName(), rc);
- rc.load(tx);
- try {
- addToDestinationCache(rc);
- } catch (KeyNotFoundException e) {
- throw new Store.FatalStoreException("Inconsistent QueueStore: " + e.getMessage(), e);
- }
+ data.destinationIndex.put(queue.getQueueName(), rc);
}
}
public void queueRemove(Transaction tx, QueueDescriptor queue) throws IOException {
- DestinationEntity destination = destinations.get(queue.getQueueName());
+ DestinationEntity destination = data.destinationIndex.get(queue.getQueueName());
if (destination != null) {
// Remove the message references.
// TODO this should probably be optimized.
@@ -499,21 +419,23 @@ public class RootEntity {
Long messageKey = messages.next().getKey();
removeMessageRef(tx, queue.getQueueName(), messageKey);
}
- destinationIndex.remove(tx, queue.getQueueName());
- removeFromDestinationCache(destination);
+ data.destinationIndex.remove(queue.getQueueName());
destination.deallocate(tx);
}
}
public DestinationEntity getDestination(QueueDescriptor queue) {
- return destinations.get(queue.getQueueName());
+ return data.destinationIndex.get(queue.getQueueName());
}
public Iterator<QueueQueryResult> queueList(Transaction tx, short type, QueueDescriptor firstQueue, int max) throws IOException {
LinkedList<QueueQueryResult> results = new LinkedList<QueueQueryResult>();
- Collection<DestinationEntity> values = (firstQueue == null ? destinations.values() : destinations.tailMap(firstQueue.getQueueName()).values());
- for (DestinationEntity de : values) {
+ final Iterator<Entry<AsciiBuffer, DestinationEntity>> i;
+ i = data.destinationIndex.iterator(firstQueue==null? null : firstQueue.getQueueName());
+ while (i.hasNext()) {
+ Entry<AsciiBuffer, DestinationEntity> entry = i.next();
+ DestinationEntity de = entry.getValue();
if (results.size() >= max) {
break;
}
@@ -537,7 +459,7 @@ public class RootEntity {
if (partitions != null && partitions.hasNext()) {
result.partitions = new LinkedList<QueueQueryResult>();
while (partitions.hasNext()) {
- result.partitions.add(queryQueue(tx, destinations.get(partitions.next().getDescriptor().getQueueName())));
+ result.partitions.add(queryQueue(tx, getDestination(partitions.next().getDescriptor()) ));
}
}
@@ -548,84 +470,82 @@ public class RootEntity {
// Map Methods.
// /////////////////////////////////////////////////////////////////
public final void mapAdd(AsciiBuffer key, Transaction tx) throws IOException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(key);
-
- if (map == null) {
- long pageId = tx.allocate().getPageId();
- map = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), pageId);
- map.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- map.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
- map.load(tx);
- mapIndex.put(tx, key, pageId);
- mapCache.put(key, map);
+ final Integer page = data.mapIndex.get(key);
+ if (page == null) {
+ int pageId = tx.alloc();
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.create(tx, pageId);
+ data.mapIndex.put(key, pageId);
}
}
public final void mapRemove(AsciiBuffer key, Transaction tx) throws IOException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.remove(key);
- if (map != null) {
- map.clear(tx);
- map.unload(tx);
- mapIndex.remove(tx, key);
+ final Integer pageId = data.mapIndex.remove(key);
+ if (pageId != null) {
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+ map.clear();
+ tx.free(pageId);
}
}
public final void mapAddEntry(AsciiBuffer name, AsciiBuffer key, Buffer value, Transaction tx) throws IOException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
- if (map == null) {
- mapAdd(name, tx);
- map = mapCache.get(name);
+ Integer pageId = data.mapIndex.get(name);
+ if (pageId == null) {
+ pageId = tx.alloc();
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.create(tx, pageId);
+ data.mapIndex.put(key, pageId);
}
-
- map.put(tx, key, value);
-
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+ map.put(key, value);
}
public final void mapRemoveEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
- if (map == null) {
+ Integer pageId = data.mapIndex.get(name);
+ if (pageId == null) {
throw new KeyNotFoundException(name.toString());
}
- map.remove(tx, key);
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+ map.remove(key);
}
public final Buffer mapGetEntry(AsciiBuffer name, AsciiBuffer key, Transaction tx) throws IOException, KeyNotFoundException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
- if (map == null) {
+ Integer pageId = data.mapIndex.get(name);
+ if (pageId == null) {
throw new KeyNotFoundException(name.toString());
}
- return map.get(tx, key);
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
+ return map.get(key);
}
public final Iterator<AsciiBuffer> mapList(AsciiBuffer first, int count, Transaction tx) {
LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
- Collection<AsciiBuffer> values = (first == null ? mapCache.keySet() : mapCache.tailMap(first).keySet());
- for (AsciiBuffer key : values) {
- results.add(key);
+ final Iterator<Entry<AsciiBuffer, Integer>> i = data.mapIndex.iterator(first);
+ while (i.hasNext()) {
+ final Entry<AsciiBuffer, Integer> entry = i.next();
+ results.add(entry.getKey());
}
return results.iterator();
}
public final Iterator<AsciiBuffer> mapListKeys(AsciiBuffer name, AsciiBuffer first, int count, Transaction tx) throws IOException, KeyNotFoundException {
- BTreeIndex<AsciiBuffer, Buffer> map = mapCache.get(name);
- if (map == null) {
+ Integer pageId = data.mapIndex.get(name);
+ if (pageId == null) {
throw new KeyNotFoundException(name.toString());
}
+ SortedIndex<AsciiBuffer, Buffer> map = mapInstanceIndexFactory.open(tx, pageId);
final LinkedList<AsciiBuffer> results = new LinkedList<AsciiBuffer>();
if (first != null && count > 0) {
- map.visit(tx, new BTreeVisitor.GTEVisitor<AsciiBuffer, Buffer>(first, count) {
-
+ map.visit(new IndexVisitor.PredicateVisitor<AsciiBuffer, Buffer>(IndexVisitor.PredicateVisitor.gte(first), count){
@Override
protected void matched(AsciiBuffer key, Buffer value) {
results.add(key);
}
});
} else {
- Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator(tx);
+ Iterator<Entry<AsciiBuffer, Buffer>> iterator = map.iterator();
while (iterator.hasNext()) {
Entry<AsciiBuffer, Buffer> e = iterator.next();
results.add(e.getKey());
@@ -638,29 +558,22 @@ public class RootEntity {
// /////////////////////////////////////////////////////////////////
// Map Methods.
// /////////////////////////////////////////////////////////////////
-
- public long getPageId() {
- return pageId;
- }
-
- public void setPageId(long pageId) {
- this.pageId = pageId;
- }
-
public int getState() {
- return state;
+ return data.state;
}
+ @Deprecated // TODO: keep data immutable
public void setState(int state) {
- this.state = state;
+ this.data.state = state;
}
public Location getLastUpdate() {
- return lastUpdate;
+ return data.lastUpdate;
}
+ @Deprecated // TODO: keep data immutable
public void setLastUpdate(Location lastUpdate) {
- this.lastUpdate = lastUpdate;
+ this.data.lastUpdate = lastUpdate;
}
private static class QueueQueryResultImpl implements QueueQueryResult {
@@ -713,7 +626,7 @@ public class RootEntity {
//so that we can be sure that all journal entries are on disk prior to
//index update.
- //Scan MessageKey Index to find message keys past the last append
+ //Scan MessageKey SortedIndex to find message keys past the last append
//location:
// final ArrayList<Long> matches = new ArrayList<Long>();
// messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
@@ -776,7 +689,7 @@ public class RootEntity {
final void removeGCCandidates(final TreeSet<Integer> gcCandidateSet, Transaction tx) throws IOException {
// Don't GC files after the first in progress tx
- Location firstTxLocation = lastUpdate;
+ Location firstTxLocation = data.lastUpdate;
if (firstTxLocation != null) {
while (!gcCandidateSet.isEmpty()) {
@@ -799,7 +712,7 @@ public class RootEntity {
// Go through the location index to see if we can remove gc candidates:
// Use a visitor to cut down the number of pages that we load
- locationIndex.visit(tx, new BTreeVisitor<Integer, Long>() {
+ data.locationIndex.visit(new IndexVisitor<Integer, Long>() {
int last = -1;
public boolean isInterestedInKeysBetween(Integer first, Integer second) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Wed Jul 7 03:24:02 2010
@@ -94,6 +94,7 @@ import org.apache.activemq.state.Command
import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.wireformat.WireFormat;
+import org.fusesource.hawtdispatch.Dispatch;
public class OpenwireProtocolHandler implements ProtocolHandler, PersistListener {
@@ -634,7 +635,7 @@ public class OpenwireProtocolHandler imp
};
} else {
- limiter = new SizeLimiter<OpenWireMessageDelivery>(connection.getInputWindowSize(), connection.getInputResumeThreshold());
+ limiter = new SizeLimiter<OpenWireMessageDelivery>(1024*64, 1024*32);
}
controller = new FlowController<OpenWireMessageDelivery>(new FlowControllable<OpenWireMessageDelivery>() {
@@ -694,7 +695,7 @@ public class OpenwireProtocolHandler imp
}
controller = new FlowController<MessageDelivery>(null, flow, limiter, this);
controller.useOverFlowQueue(false);
- controller.setExecutor(connection.getDispatcher().getGlobalQueue(DispatchPriority.HIGH));
+ controller.setExecutor(Dispatch.getGlobalQueue());
super.onFlowOpened(controller);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Wed Jul 7 03:24:02 2010
@@ -16,21 +16,9 @@
*/
package org.apache.activemq.openwire;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.transport.InactivityMonitor;
-import org.apache.activemq.transport.ResponseCorrelator;
-import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.buffer.Buffer;
import org.apache.activemq.util.buffer.BufferEditor;
import org.apache.activemq.util.buffer.DataByteArrayInputStream;
@@ -38,6 +26,15 @@ import org.apache.activemq.util.buffer.D
import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.channels.ReadableByteChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
*
*/
@@ -284,6 +281,10 @@ public final class OpenWireFormat implem
return doUnmarshal(dataIn);
}
+ public Object unmarshal(ReadableByteChannel channel) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Used by NIO or AIO transports
*/
@@ -586,9 +587,6 @@ public final class OpenWireFormat implem
return preferedWireFormatInfo;
}
- public boolean inReceive() {
- return receivingMessage.get();
- }
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
@@ -645,16 +643,6 @@ public final class OpenWireFormat implem
return version2;
}
- public Transport createTransportFilters(Transport transport, Map options) {
-
- if (transport.isUseInactivityMonitor()) {
- transport = new InactivityMonitor(transport, this);
- }
-
- transport = new WireFormatNegotiator(transport, this, 1);
- transport = new ResponseCorrelator(transport);
- return transport;
- }
public WireFormatFactory getWireFormatFactory() {
return new OpenWireFormatFactory();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=961062&r1=961061&r2=961062&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-openwire/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Wed Jul 7 03:24:02 2010
@@ -117,7 +117,7 @@ public class ConnectionStateTracker exte
// Restore the connections.
for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
ConnectionState connectionState = iter.next();
- transport.oneway(connectionState.getInfo());
+ transport.oneway(connectionState.getInfo(), null);
restoreTempDestinations(transport, connectionState);
if (restoreSessions) {
@@ -130,7 +130,7 @@ public class ConnectionStateTracker exte
}
//now flush messages
for (Message msg:messageCache.values()) {
- transport.oneway(msg);
+ transport.oneway(msg, null);
}
}
@@ -145,7 +145,7 @@ public class ConnectionStateTracker exte
if (LOG.isDebugEnabled()) {
LOG.debug("tx replay: " + command);
}
- transport.oneway(command);
+ transport.oneway(command, null);
}
}
}
@@ -159,7 +159,7 @@ public class ConnectionStateTracker exte
// Restore the connection's sessions
for (Iterator iter2 = connectionState.getSessionStates().iterator(); iter2.hasNext();) {
SessionState sessionState = (SessionState)iter2.next();
- transport.oneway(sessionState.getInfo());
+ transport.oneway(sessionState.getInfo(), null);
if (restoreProducers) {
restoreProducers(transport, sessionState);
@@ -180,7 +180,7 @@ public class ConnectionStateTracker exte
// Restore the session's consumers
for (Iterator iter3 = sessionState.getConsumerStates().iterator(); iter3.hasNext();) {
ConsumerState consumerState = (ConsumerState)iter3.next();
- transport.oneway(consumerState.getInfo());
+ transport.oneway(consumerState.getInfo(), null);
}
}
@@ -193,7 +193,7 @@ public class ConnectionStateTracker exte
// Restore the session's producers
for (Iterator iter3 = sessionState.getProducerStates().iterator(); iter3.hasNext();) {
ProducerState producerState = (ProducerState)iter3.next();
- transport.oneway(producerState.getInfo());
+ transport.oneway(producerState.getInfo(), null);
}
}
@@ -206,7 +206,7 @@ public class ConnectionStateTracker exte
throws IOException {
// Restore the connection's temp destinations.
for (Iterator iter2 = connectionState.getTempDesinations().iterator(); iter2.hasNext();) {
- transport.oneway((DestinationInfo)iter2.next());
+ transport.oneway((DestinationInfo)iter2.next(), null);
}
}