You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:31 UTC
svn commit: r961127 [2/2] - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/
activemq-broker/src/test/scala/org/apache/...
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java Wed Jul 7 04:06:30 2010
@@ -26,20 +26,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.apollo.store.QueueRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryPut;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemoveMessage;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Trace;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Type;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Type.TypeCreatable;
+import org.apache.activemq.broker.store.hawtdb.model.*;
+import org.apache.activemq.broker.store.hawtdb.model.Type.*;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayInputStream;
@@ -54,6 +42,7 @@ import org.fusesource.hawtdb.api.Transac
import org.fusesource.hawtdb.api.TxPageFile;
import org.fusesource.hawtdb.api.TxPageFileFactory;
import org.fusesource.hawtdb.internal.journal.Journal;
+import org.fusesource.hawtdb.internal.journal.JournalCallback;
import org.fusesource.hawtdb.internal.journal.Location;
public class HawtDBManager {
@@ -77,7 +66,7 @@ public class HawtDBManager {
public static final int OPEN_STATE = 2;
protected TxPageFileFactory pageFileFactory = new TxPageFileFactory();
- protected TxPageFile pageFile;
+ public TxPageFile pageFile;
protected Journal journal;
protected RootEntity rootEntity = new RootEntity();
@@ -272,7 +261,7 @@ public class HawtDBManager {
try {
open();
- store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
+ store(new AddTrace.Bean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
} finally {
indexLock.writeLock().unlock();
}
@@ -562,7 +551,7 @@ public class HawtDBManager {
* @throws IOException
*/
@SuppressWarnings("unchecked")
- Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
+ public Location store(final TypeCreatable data, final Runnable onFlush, Transaction tx) throws IOException {
final MessageBuffer message = ((PBMessage) data).freeze();
int size = message.serializedSizeUnframed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -579,7 +568,13 @@ public class HawtDBManager {
long start = System.currentTimeMillis();
final Location location;
synchronized (journal) {
- location = journal.write(os.toBuffer(), onFlush);
+ location = journal.write(os.toBuffer(), new JournalCallback(){
+ public void success(Location location) {
+ if( onFlush!=null ) {
+ onFlush.run();
+ }
+ }
+ });
}
long start2 = System.currentTimeMillis();
@@ -634,66 +629,66 @@ public class HawtDBManager {
// System.out.println("Updating index" + type.toString() + " loc: " +
// location);
switch (type) {
- case MESSAGE_ADD:
- messageAdd(tx, (MessageAdd) command, location);
- break;
- case QUEUE_ADD:
- queueAdd(tx, (QueueAdd) command, location);
- break;
- case QUEUE_REMOVE:
- queueRemove(tx, (QueueRemove) command, location);
- break;
- case QUEUE_ADD_MESSAGE:
- queueAddMessage(tx, (QueueAddMessage) command, location);
- break;
- case QUEUE_REMOVE_MESSAGE:
- queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
- break;
- case SUBSCRIPTION_ADD:
- rootEntity.addSubscription((SubscriptionAdd) command);
- break;
- case SUBSCRIPTION_REMOVE:
- rootEntity.removeSubscription(((SubscriptionRemove) command).getName());
+ case ADD_MESSAGE:
+ messageAdd(tx, (AddMessage.Getter) command, location);
break;
- case TRANSACTION_BEGIN:
- case TRANSACTION_ADD_MESSAGE:
- case TRANSACTION_REMOVE_MESSAGE:
- case TRANSACTION_COMMIT:
- case TRANSACTION_ROLLBACK:
- case MAP_ADD:
- rootEntity.mapAdd(((MapAdd) command).getMapName(), tx);
+ case ADD_QUEUE:
+ queueAdd(tx, (AddQueue.Getter) command, location);
break;
- case MAP_REMOVE:
- rootEntity.mapRemove(((MapRemove) command).getMapName(), tx);
+ case REMOVE_QUEUE:
+ queueRemove(tx, (RemoveQueue.Getter) command, location);
break;
- case MAP_ENTRY_PUT: {
- MapEntryPut p = (MapEntryPut) command;
- rootEntity.mapAddEntry(p.getMapName(), p.getId(), p.getValue(), tx);
- break;
- }
- case MAP_ENTRY_REMOVE: {
- MapEntryRemove p = (MapEntryRemove) command;
- try {
- rootEntity.mapRemoveEntry(p.getMapName(), p.getId(), tx);
- } catch (KeyNotFoundException e) {
- //yay, removed.
- }
- break;
- }
- case STREAM_OPEN:
- case STREAM_WRITE:
- case STREAM_CLOSE:
- case STREAM_REMOVE:
- throw new UnsupportedOperationException();
+// case QUEUE_ADD_ENTRY:
+// queueAddMessage(tx, (AddQueueEntry) command, location);
+// break;
+// case QUEUE_REMOVE_ENTRY:
+// queueRemoveMessage(tx, (RemoveQueueEntry) command, location);
+// break;
+// case SUBSCRIPTION_ADD:
+// rootEntity.addSubscription((AddSubscription) command);
+// break;
+// case SUBSCRIPTION_REMOVE:
+// rootEntity.removeSubscription(((RemoveSubscription) command).getName());
+// break;
+// case TRANSACTION_BEGIN:
+// case TRANSACTION_ADD_MESSAGE:
+// case TRANSACTION_REMOVE_MESSAGE:
+// case TRANSACTION_COMMIT:
+// case TRANSACTION_ROLLBACK:
+// case MAP_ADD:
+// rootEntity.mapAdd(((AddMap) command).getMapName(), tx);
+// break;
+// case MAP_REMOVE:
+// rootEntity.mapRemove(((RemoveMap) command).getMapName(), tx);
+// break;
+// case MAP_ENTRY_PUT: {
+// PutMapEntry p = (PutMapEntry) command;
+// rootEntity.mapAddEntry(p.getMapName(), p.getId(), p.getValue(), tx);
+// break;
+// }
+// case MAP_ENTRY_REMOVE: {
+// RemoveMapEntry p = (RemoveMapEntry) command;
+// try {
+// rootEntity.mapRemoveEntry(p.getMapName(), p.getId(), tx);
+// } catch (KeyNotFoundException e) {
+// //yay, removed.
+// }
+// break;
+// }
+// case STREAM_OPEN:
+// case STREAM_WRITE:
+// case STREAM_CLOSE:
+// case STREAM_REMOVE:
+// throw new UnsupportedOperationException();
}
rootEntity.setLastUpdate(location);
}
- private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+ private void messageAdd(Transaction tx, AddMessage.Getter command, Location location) throws IOException {
rootEntity.messageAdd(tx, command, location);
}
- private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
+ private void queueAdd(Transaction tx, AddQueue.Getter command, Location location) throws IOException {
QueueRecord qd = new QueueRecord();
qd.name = command.getName();
qd.queueType = command.getQueueType();
@@ -705,11 +700,11 @@ public class HawtDBManager {
rootEntity.queueAdd(tx, qd);
}
- private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
+ private void queueRemove(Transaction tx, RemoveQueue.Getter command, Location location) throws IOException {
rootEntity.queueRemove(tx, command.getKey());
}
- private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
+ private void queueAddMessage(Transaction tx, AddQueueEntry.Getter command, Location location) throws IOException {
QueueRecord qd = new QueueRecord();
DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
if (destination != null) {
@@ -724,7 +719,7 @@ public class HawtDBManager {
}
}
- private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
+ private void queueRemoveMessage(Transaction tx, RemoveQueueEntry.Getter command, Location location) throws IOException {
DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
if (destination != null) {
long messageKey = destination.remove(tx, command.getQueueKey());
@@ -758,8 +753,8 @@ public class HawtDBManager {
try {
final CountDownLatch done = new CountDownLatch(1);
synchronized (journal) {
- journal.write(FLUSH_DATA, new Runnable() {
- public void run() {
+ journal.write(FLUSH_DATA, new JournalCallback(){
+ public void success(Location location) {
done.countDown();
}
});
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java Wed Jul 7 04:06:30 2010
@@ -1,9 +1,11 @@
package org.apache.activemq.broker.store.hawtdb.store;
import org.apache.activemq.apollo.store.*;
+import org.apache.activemq.broker.store.hawtdb.model.*;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.internal.journal.JournalCallback;
import org.fusesource.hawtdb.internal.journal.Location;
import java.io.IOException;
@@ -18,7 +20,7 @@ import java.util.Iterator;
*/
class HawtDBSession {
- Data.Type.TypeCreatable atomicUpdate = null;
+ Type.TypeCreatable atomicUpdate = null;
int updateCount = 0;
private Transaction tx;
@@ -78,14 +80,20 @@ class HawtDBSession {
}
}
- public void commit(Runnable onFlush) {
+ public void commit(final Runnable onFlush) {
try {
boolean flush = false;
if (atomicUpdate != null) {
store.store(atomicUpdate, onFlush, tx);
} else if (updateCount > 1) {
- store.journal.write(HawtDBManager.END_UNIT_OF_WORK_DATA, onFlush);
+ store.journal.write(HawtDBManager.END_UNIT_OF_WORK_DATA, new JournalCallback(){
+ public void success(Location location) {
+ if( onFlush!=null ) {
+ onFlush.run();
+ }
+ }
+ });
} else {
flush = onFlush != null;
}
@@ -119,7 +127,7 @@ class HawtDBSession {
}
}
- private void addUpdate(Data.Type.TypeCreatable bean) {
+ private void addUpdate(Type.TypeCreatable bean) {
try {
//As soon as we do more than one update we'll wrap in a unit of
//work:
@@ -146,10 +154,10 @@ class HawtDBSession {
if (message.key < 0) {
throw new IllegalArgumentException("Key not set");
}
- Data.MessageAdd.MessageAddBean bean = new Data.MessageAdd.MessageAddBean();
+ AddMessage.Bean bean = new AddMessage.Bean();
bean.setMessageKey(message.key);
bean.setProtocol(message.protocol);
- bean.setMessageSize(message.size);
+ bean.setSize(message.size);
Buffer buffer = message.value;
if (buffer != null) {
bean.setValue(buffer);
@@ -169,11 +177,11 @@ class HawtDBSession {
throw new KeyNotFoundException("message key: " + key);
}
try {
- Data.MessageAdd bean = (Data.MessageAdd) store.load(location);
+ AddMessage.Bean bean = (AddMessage.Bean) store.load(location);
MessageRecord rc = new MessageRecord();
rc.key = bean.getMessageKey();
rc.protocol = bean.getProtocol();
- rc.size = bean.getMessageSize();
+ rc.size = bean.getSize();
if (bean.hasValue()) {
rc.value = bean.getValue();
}
@@ -190,7 +198,7 @@ class HawtDBSession {
// Queue related methods.
// /////////////////////////////////////////////////////////////
public void queueAdd(QueueRecord record) {
- Data.QueueAdd.QueueAddBean update = new Data.QueueAdd.QueueAddBean();
+ AddQueue.Bean update = new AddQueue.Bean();
update.setName(record.name);
update.setQueueType(record.queueType);
// AsciiBuffer parent = record.getParent();
@@ -202,7 +210,7 @@ class HawtDBSession {
}
public void queueRemove(QueueRecord record) {
- addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.key));
+ addUpdate(new RemoveQueue.Bean().setKey(record.key));
}
public Iterator<QueueStatus> queueListByType(AsciiBuffer type, QueueRecord firstQueue, int max) {
@@ -224,11 +232,11 @@ class HawtDBSession {
}
public void queueAddMessage(QueueRecord queue, QueueEntryRecord entryRecord) throws KeyNotFoundException {
- Data.QueueAddMessage.QueueAddMessageBean bean = new Data.QueueAddMessage.QueueAddMessageBean();
+ AddQueueEntry.Bean bean = new AddQueueEntry.Bean();
bean.setQueueKey(queue.key);
bean.setQueueKey(entryRecord.queueKey);
bean.setMessageKey(entryRecord.messageKey);
- bean.setMessageSize(entryRecord.size);
+ bean.setSize(entryRecord.size);
if (entryRecord.attachment != null) {
bean.setAttachment(entryRecord.attachment);
}
@@ -236,9 +244,9 @@ class HawtDBSession {
}
public void queueRemoveMessage(QueueRecord queue, Long queueKey) throws KeyNotFoundException {
- Data.QueueRemoveMessage.QueueRemoveMessageBean bean = new Data.QueueRemoveMessage.QueueRemoveMessageBean();
- bean.setQueueKey(queueKey);
- bean.setQueueName(queue.name);
+ RemoveQueueEntry.Bean bean = new RemoveQueueEntry.Bean();
+ bean.setQueueKey(queue.key);
+ bean.setQueueSeq(queueKey);
addUpdate(bean);
}
@@ -286,7 +294,7 @@ class HawtDBSession {
* exist then it will simply be added.
*/
public void updateSubscription(SubscriptionRecord record) {
- Data.SubscriptionAdd.SubscriptionAddBean update = new Data.SubscriptionAdd.SubscriptionAddBean();
+ AddSubscription.Bean update = new AddSubscription.Bean();
update.setName(record.name);
update.setDestination(record.destination);
update.setDurable(record.isDurable);
@@ -307,7 +315,7 @@ class HawtDBSession {
* Removes a subscription with the given name from the store.
*/
public void removeSubscription(AsciiBuffer name) {
- Data.SubscriptionRemove.SubscriptionRemoveBean update = new Data.SubscriptionRemove.SubscriptionRemoveBean();
+ RemoveSubscription.Bean update = new RemoveSubscription.Bean();
update.setName(name);
addUpdate(update);
}
@@ -328,13 +336,13 @@ class HawtDBSession {
// Map related methods.
// /////////////////////////////////////////////////////////////
public void mapAdd(AsciiBuffer map) {
- Data.MapAdd.MapAddBean update = new Data.MapAdd.MapAddBean();
+ AddMap.Bean update = new AddMap.Bean();
update.setMapName(map);
addUpdate(update);
}
public void mapRemove(AsciiBuffer map) {
- Data.MapRemove.MapRemoveBean update = new Data.MapRemove.MapRemoveBean();
+ RemoveMap.Bean update = new RemoveMap.Bean();
update.setMapName(map);
addUpdate(update);
}
@@ -345,7 +353,7 @@ class HawtDBSession {
}
public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
- Data.MapEntryPut.MapEntryPutBean update = new Data.MapEntryPut.MapEntryPutBean();
+ PutMapEntry.Bean update = new PutMapEntry.Bean();
update.setMapName(map);
update.setId(key);
update.setValue(value);
@@ -362,7 +370,7 @@ class HawtDBSession {
}
public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
- Data.MapEntryRemove.MapEntryRemoveBean update = new Data.MapEntryRemove.MapEntryRemoveBean();
+ RemoveMapEntry.Bean update = new RemoveMapEntry.Bean();
update.setMapName(map);
update.setId(key);
addUpdate(update);
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java Wed Jul 7 04:06:30 2010
@@ -21,10 +21,10 @@ import java.io.DataOutput;
import java.io.IOException;
import org.fusesource.hawtbuf.AsciiBuffer;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
import org.fusesource.hawtdb.internal.journal.Location;
-import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
+import org.fusesource.hawtdb.internal.journal.LocationCodec;
public class MessageKeys {
@@ -41,16 +41,16 @@ public class MessageKeys {
return "["+messageId+","+location+"]";
}
- public static final Marshaller<MessageKeys> MARSHALLER = new VariableMarshaller<MessageKeys>() {
- public MessageKeys readPayload(DataInput dataIn) throws IOException {
- Location location = LocationMarshaller.INSTANCE.readPayload(dataIn);
+ public static final Codec<MessageKeys> CODEC = new VariableCodec<MessageKeys>() {
+ public MessageKeys decode(DataInput dataIn) throws IOException {
+ Location location = LocationCodec.INSTANCE.decode(dataIn);
byte data[] = new byte[dataIn.readShort()];
dataIn.readFully(data);
return new MessageKeys(new AsciiBuffer(data), location);
}
- public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
- LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+ public void encode(MessageKeys object, DataOutput dataOut) throws IOException {
+ LocationCodec.INSTANCE.encode(object.location, dataOut);
dataOut.writeShort(object.messageId.length);
dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java Wed Jul 7 04:06:30 2010
@@ -19,17 +19,17 @@ package org.apache.activemq.broker.store
import org.apache.activemq.apollo.store.QueueRecord;
import org.apache.activemq.apollo.store.QueueStatus;
import org.apache.activemq.apollo.store.SubscriptionRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd.SubscriptionAddBuffer;
+import org.apache.activemq.broker.store.hawtdb.Codecs;
+import org.apache.activemq.broker.store.hawtdb.model.*;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.codec.IntegerCodec;
+import org.fusesource.hawtbuf.codec.LongCodec;
import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
import org.fusesource.hawtdb.api.*;
import org.fusesource.hawtdb.internal.journal.Location;
-import org.fusesource.hawtdb.util.marshaller.IntegerMarshaller;
-import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
-import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
+import org.fusesource.hawtdb.internal.journal.LocationCodec;
+
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -53,28 +53,28 @@ public class RootEntity {
private static final BTreeIndexFactory<AsciiBuffer, Buffer> mapInstanceIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
static {
- messageKeyIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- messageKeyIndexFactory.setValueMarshaller(LocationMarshaller.INSTANCE);
+ messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ messageKeyIndexFactory.setValueCodec(LocationCodec.INSTANCE);
messageKeyIndexFactory.setDeferredEncoding(true);
- locationIndexFactory.setKeyMarshaller(IntegerMarshaller.INSTANCE);
- locationIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
+ locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
locationIndexFactory.setDeferredEncoding(true);
- messageRefsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- messageRefsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+ messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
messageRefsIndexFactory.setDeferredEncoding(true);
- destinationIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
- destinationIndexFactory.setValueMarshaller(DestinationEntity.MARSHALLER);
+ destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+ destinationIndexFactory.setValueCodec(DestinationEntity.CODEC);
destinationIndexFactory.setDeferredEncoding(true);
- subscriptionIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- subscriptionIndexFactory.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+ subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+ subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
subscriptionIndexFactory.setDeferredEncoding(true);
- mapIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
- mapIndexFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
+ mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+ mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
mapIndexFactory.setDeferredEncoding(true);
}
@@ -133,7 +133,7 @@ public class RootEntity {
os.writeInt(object.mapIndex.getPage());
if (object.lastUpdate != null) {
os.writeBoolean(true);
- LocationMarshaller.INSTANCE.writePayload(object.lastUpdate, os);
+ LocationCodec.INSTANCE.encode(object.lastUpdate, os);
} else {
os.writeBoolean(false);
}
@@ -153,7 +153,7 @@ public class RootEntity {
rc.subscriptionIndex = subscriptionIndexFactory.open(paged, is.readInt());
rc.mapIndex = mapIndexFactory.open(paged, is.readInt());
if (is.readBoolean()) {
- rc.lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+ rc.lastUpdate = LocationCodec.INSTANCE.decode(is);
} else {
rc.lastUpdate = null;
}
@@ -202,7 +202,7 @@ public class RootEntity {
return maxMessageKey;
}
- public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+ public void messageAdd(Transaction tx, AddMessage.Getter command, Location location) throws IOException {
long id = command.getMessageKey();
if (id > maxMessageKey) {
maxMessageKey = id;
@@ -324,7 +324,7 @@ public class RootEntity {
/**
* @throws IOException
*/
- public void addSubscription(SubscriptionAdd subscription) throws IOException {
+ public void addSubscription(AddSubscription.Bean subscription) throws IOException {
data.subscriptionIndex.put(subscription.getName(), subscription.freeze().toFramedBuffer());
}
@@ -352,7 +352,7 @@ public class RootEntity {
SubscriptionRecord rc = null;
if (b != null) {
- SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
+ AddSubscription.Buffer sab = AddSubscription.FACTORY.parseFramed(b);
if (sab != null) {
rc = new SubscriptionRecord();
rc.name = sab.getName();
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java Wed Jul 7 04:06:30 2010
@@ -17,10 +17,10 @@
*/
package org.apache.activemq.util;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
import java.io.IOException;
import java.io.DataOutput;
@@ -77,20 +77,20 @@ public interface Hasher<N, K> {
* @param <K>
*/
public class BinaryHasher<N, K> implements Hasher<N, K> {
- private final Marshaller<N> nodeMarshaller;
- private final Marshaller<K> keyMarshaller;
+ private final Codec<N> nodeCodec;
+ private final Codec<K> keyCodec;
private final HashAlgorithim hashAlgorithim;
- public BinaryHasher(Marshaller<N> nodeMarshaller, Marshaller<K> keyMarshaller, HashAlgorithim hashAlgorithim) {
- this.nodeMarshaller = nodeMarshaller;
- this.keyMarshaller = keyMarshaller;
+ public BinaryHasher(Codec<N> nodeCodec, Codec<K> keyCodec, HashAlgorithim hashAlgorithim) {
+ this.nodeCodec = nodeCodec;
+ this.keyCodec = keyCodec;
this.hashAlgorithim = hashAlgorithim;
}
public int hashNode(N node, int i) {
try {
DataByteArrayOutputStream os = new DataByteArrayOutputStream();
- nodeMarshaller.writePayload(node, os);
+ nodeCodec.encode(node, os);
os.write(':');
os.writeInt(i);
return hash(os.toBuffer());
@@ -102,7 +102,7 @@ public interface Hasher<N, K> {
public int hashKey(K value) {
try {
DataByteArrayOutputStream os = new DataByteArrayOutputStream();
- keyMarshaller.writePayload(value, os);
+ keyCodec.encode(value, os);
return hash(os.toBuffer());
} catch (IOException e) {
throw new RuntimeException(e);
@@ -352,12 +352,12 @@ public interface Hasher<N, K> {
* Used to convert an object to a byte[] by basically doing:
* Object.toString().getBytes("UTF-8")
*/
- public class ToStringMarshaller extends VariableMarshaller {
- public void writePayload(Object o, DataOutput dataOutput) throws IOException {
+ public class ToStringCodec extends VariableCodec {
+ public void encode(Object o, DataOutput dataOutput) throws IOException {
dataOutput.write(o.toString().getBytes("UTF-8"));
}
- public Object readPayload(DataInput dataInput) throws IOException {
+ public Object decode(DataInput dataInput) throws IOException {
throw new UnsupportedOperationException();
}
@@ -381,7 +381,7 @@ public interface Hasher<N, K> {
}
public ToStringHasher(HashAlgorithim hashAlgorithim) {
- super(new ToStringMarshaller(), new ToStringMarshaller(), hashAlgorithim);
+ super(new ToStringCodec(), new ToStringCodec(), hashAlgorithim);
}
@Override
Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java Wed Jul 7 04:06:30 2010
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.util.list;
+import org.fusesource.hawtbuf.codec.Codec;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -23,8 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
-import org.apache.activemq.util.marshaller.Marshaller;
-
/**
* Keeps track of a added long values. Collapses ranges of numbers using a
* Sequence representation. Use to keep track of received message ids to find
@@ -34,11 +34,9 @@ import org.apache.activemq.util.marshall
*/
public class SequenceSet extends LinkedNodeList<Sequence> {
- public static class Marshaller implements org.apache.activemq.util.marshaller.Marshaller<SequenceSet> {
+ public static Codec CODEC = new Codec<SequenceSet>() {
- public static final Marshaller INSTANCE = new Marshaller();
-
- public SequenceSet readPayload(DataInput in) throws IOException {
+ public SequenceSet decode(DataInput in) throws IOException {
SequenceSet value = new SequenceSet();
int count = in.readInt();
for (int i = 0; i < count; i++) {
@@ -53,7 +51,7 @@ public class SequenceSet extends LinkedN
return value;
}
- public void writePayload(SequenceSet value, DataOutput out) throws IOException {
+ public void encode(SequenceSet value, DataOutput out) throws IOException {
out.writeInt(value.size());
Sequence sequence = value.getHead();
while (sequence != null ) {
@@ -90,7 +88,7 @@ public class SequenceSet extends LinkedN
public int estimatedSize(SequenceSet object) {
return object.size()*16;
}
- }
+ };
public void add(Sequence value) {
// TODO we can probably optimize this a bit
Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala Wed Jul 7 04:06:30 2010
@@ -206,7 +206,7 @@ class FileConfigStore extends ConfigStor
private def defaultConfig(rev:Int) = {
- val config = Broker.default
+ val config = Broker.defaultConfig
config.rev = rev
config
}
Modified: activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page (original)
+++ activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page Wed Jul 7 04:06:30 2010
@@ -85,7 +85,7 @@ sort_info: 2
in the diagram to to the right illustrates the hash values that would map to the
%%em Broker 2
node. This algorithm has been implemented in the activemq-util modules as the
- %a{:href=>"http://fisheye6.atlassian.com/browse/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java?r=HEAD"}
+ %a{:href=>"http://github.com/chirino/activemq-apollo/blob/master/activemq-util/src/main/scala/org/apache/activemq/util/HashRing.java#L47"}
%em HashRing
class.
%p