You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:06:31 UTC

svn commit: r961127 [2/2] - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/main/scala/org/apache/activemq/apollo/transport/vm/ activemq-broker/src/test/scala/org/apache/...

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBManager.java Wed Jul  7 04:06:30 2010
@@ -26,20 +26,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.apollo.store.QueueRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryPut;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapEntryRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MapRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueAddMessage;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.QueueRemoveMessage;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionRemove;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Trace;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Type;
-import org.apache.activemq.broker.store.hawtdb.store.Data.Type.TypeCreatable;
+import org.apache.activemq.broker.store.hawtdb.model.*;
+import org.apache.activemq.broker.store.hawtdb.model.Type.*;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.DataByteArrayInputStream;
@@ -54,6 +42,7 @@ import org.fusesource.hawtdb.api.Transac
 import org.fusesource.hawtdb.api.TxPageFile;
 import org.fusesource.hawtdb.api.TxPageFileFactory;
 import org.fusesource.hawtdb.internal.journal.Journal;
+import org.fusesource.hawtdb.internal.journal.JournalCallback;
 import org.fusesource.hawtdb.internal.journal.Location;
 
 public class HawtDBManager {
@@ -77,7 +66,7 @@ public class HawtDBManager {
     public static final int OPEN_STATE = 2;
 
     protected TxPageFileFactory pageFileFactory = new TxPageFileFactory();
-    protected TxPageFile pageFile;
+    public TxPageFile pageFile;
     protected Journal journal;
 
     protected RootEntity rootEntity = new RootEntity();
@@ -272,7 +261,7 @@ public class HawtDBManager {
         try {
             open();
 
-            store(new Trace.TraceBean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
+            store(new AddTrace.Bean().setMessage(new AsciiBuffer("LOADED " + new Date())), null);
         } finally {
             indexLock.writeLock().unlock();
         }
@@ -562,7 +551,7 @@ public class HawtDBManager {
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    Location store(final TypeCreatable data, Runnable onFlush, Transaction tx) throws IOException {
+    public Location store(final TypeCreatable data, final Runnable onFlush, Transaction tx) throws IOException {
         final MessageBuffer message = ((PBMessage) data).freeze();
         int size = message.serializedSizeUnframed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
@@ -579,7 +568,13 @@ public class HawtDBManager {
             long start = System.currentTimeMillis();
             final Location location;
             synchronized (journal) {
-                location = journal.write(os.toBuffer(), onFlush);
+                location = journal.write(os.toBuffer(), new JournalCallback(){
+                    public void success(Location location) {
+                        if( onFlush!=null ) {
+                            onFlush.run();
+                        }
+                    }
+                });
             }
             long start2 = System.currentTimeMillis();
 
@@ -634,66 +629,66 @@ public class HawtDBManager {
         // System.out.println("Updating index" + type.toString() + " loc: " +
         // location);
         switch (type) {
-        case MESSAGE_ADD:
-            messageAdd(tx, (MessageAdd) command, location);
-            break;
-        case QUEUE_ADD:
-            queueAdd(tx, (QueueAdd) command, location);
-            break;
-        case QUEUE_REMOVE:
-            queueRemove(tx, (QueueRemove) command, location);
-            break;
-        case QUEUE_ADD_MESSAGE:
-            queueAddMessage(tx, (QueueAddMessage) command, location);
-            break;
-        case QUEUE_REMOVE_MESSAGE:
-            queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
-            break;
-        case SUBSCRIPTION_ADD:
-            rootEntity.addSubscription((SubscriptionAdd) command);
-            break;
-        case SUBSCRIPTION_REMOVE:
-            rootEntity.removeSubscription(((SubscriptionRemove) command).getName());
+        case ADD_MESSAGE:
+            messageAdd(tx, (AddMessage.Getter) command, location);
             break;
-        case TRANSACTION_BEGIN:
-        case TRANSACTION_ADD_MESSAGE:
-        case TRANSACTION_REMOVE_MESSAGE:
-        case TRANSACTION_COMMIT:
-        case TRANSACTION_ROLLBACK:
-        case MAP_ADD:
-            rootEntity.mapAdd(((MapAdd) command).getMapName(), tx);
+        case ADD_QUEUE:
+            queueAdd(tx, (AddQueue.Getter) command, location);
             break;
-        case MAP_REMOVE:
-            rootEntity.mapRemove(((MapRemove) command).getMapName(), tx);
+        case REMOVE_QUEUE:
+            queueRemove(tx, (RemoveQueue.Getter) command, location);
             break;
-        case MAP_ENTRY_PUT: {
-            MapEntryPut p = (MapEntryPut) command;
-            rootEntity.mapAddEntry(p.getMapName(), p.getId(), p.getValue(), tx);
-            break;
-        }
-        case MAP_ENTRY_REMOVE: {
-            MapEntryRemove p = (MapEntryRemove) command;
-            try {
-                rootEntity.mapRemoveEntry(p.getMapName(), p.getId(), tx);
-            } catch (KeyNotFoundException e) {
-                //yay, removed.
-            }
-            break;
-        }
-        case STREAM_OPEN:
-        case STREAM_WRITE:
-        case STREAM_CLOSE:
-        case STREAM_REMOVE:
-            throw new UnsupportedOperationException();
+//        case QUEUE_ADD_ENTRY:
+//            queueAddMessage(tx, (AddQueueEntry) command, location);
+//            break;
+//        case QUEUE_REMOVE_ENTRY:
+//            queueRemoveMessage(tx, (RemoveQueueEntry) command, location);
+//            break;
+//        case SUBSCRIPTION_ADD:
+//            rootEntity.addSubscription((AddSubscription) command);
+//            break;
+//        case SUBSCRIPTION_REMOVE:
+//            rootEntity.removeSubscription(((RemoveSubscription) command).getName());
+//            break;
+//        case TRANSACTION_BEGIN:
+//        case TRANSACTION_ADD_MESSAGE:
+//        case TRANSACTION_REMOVE_MESSAGE:
+//        case TRANSACTION_COMMIT:
+//        case TRANSACTION_ROLLBACK:
+//        case MAP_ADD:
+//            rootEntity.mapAdd(((AddMap) command).getMapName(), tx);
+//            break;
+//        case MAP_REMOVE:
+//            rootEntity.mapRemove(((RemoveMap) command).getMapName(), tx);
+//            break;
+//        case MAP_ENTRY_PUT: {
+//            PutMapEntry p = (PutMapEntry) command;
+//            rootEntity.mapAddEntry(p.getMapName(), p.getId(), p.getValue(), tx);
+//            break;
+//        }
+//        case MAP_ENTRY_REMOVE: {
+//            RemoveMapEntry p = (RemoveMapEntry) command;
+//            try {
+//                rootEntity.mapRemoveEntry(p.getMapName(), p.getId(), tx);
+//            } catch (KeyNotFoundException e) {
+//                //yay, removed.
+//            }
+//            break;
+//        }
+//        case STREAM_OPEN:
+//        case STREAM_WRITE:
+//        case STREAM_CLOSE:
+//        case STREAM_REMOVE:
+//            throw new UnsupportedOperationException();
         }
         rootEntity.setLastUpdate(location);
     }
 
-    private void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+    private void messageAdd(Transaction tx, AddMessage.Getter command, Location location) throws IOException {
         rootEntity.messageAdd(tx, command, location);
     }
 
-    private void queueAdd(Transaction tx, QueueAdd command, Location location) throws IOException {
+    private void queueAdd(Transaction tx, AddQueue.Getter command, Location location) throws IOException {
         QueueRecord qd = new QueueRecord();
         qd.name = command.getName();
         qd.queueType = command.getQueueType();
@@ -705,11 +700,11 @@ public class HawtDBManager {
         rootEntity.queueAdd(tx, qd);
     }
 
-    private void queueRemove(Transaction tx, QueueRemove command, Location location) throws IOException {
+    private void queueRemove(Transaction tx, RemoveQueue.Getter command, Location location) throws IOException {
         rootEntity.queueRemove(tx, command.getKey());
     }
 
-    private void queueAddMessage(Transaction tx, QueueAddMessage command, Location location) throws IOException {
+    private void queueAddMessage(Transaction tx, AddQueueEntry.Getter command, Location location) throws IOException {
         QueueRecord qd = new QueueRecord();
         DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
         if (destination != null) {
@@ -724,7 +719,7 @@ public class HawtDBManager {
         }
     }
 
-    private void queueRemoveMessage(Transaction tx, QueueRemoveMessage command, Location location) throws IOException {
+    private void queueRemoveMessage(Transaction tx, RemoveQueueEntry.Getter command, Location location) throws IOException {
         DestinationEntity destination = rootEntity.getDestination(command.getQueueKey());
         if (destination != null) {
             long messageKey = destination.remove(tx, command.getQueueKey());
@@ -758,8 +753,8 @@ public class HawtDBManager {
         try {
             final CountDownLatch done = new CountDownLatch(1);
             synchronized (journal) {
-                journal.write(FLUSH_DATA, new Runnable() {
-                    public void run() {
+                journal.write(FLUSH_DATA, new JournalCallback(){
+                    public void success(Location location) {
                         done.countDown();
                     }
                 });

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/HawtDBSession.java Wed Jul  7 04:06:30 2010
@@ -1,9 +1,11 @@
 package org.apache.activemq.broker.store.hawtdb.store;
 
 import org.apache.activemq.apollo.store.*;
+import org.apache.activemq.broker.store.hawtdb.model.*;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtdb.api.Transaction;
+import org.fusesource.hawtdb.internal.journal.JournalCallback;
 import org.fusesource.hawtdb.internal.journal.Location;
 
 import java.io.IOException;
@@ -18,7 +20,7 @@ import java.util.Iterator;
 */
 class HawtDBSession {
 
-    Data.Type.TypeCreatable atomicUpdate = null;
+    Type.TypeCreatable atomicUpdate = null;
     int updateCount = 0;
 
     private Transaction tx;
@@ -78,14 +80,20 @@ class HawtDBSession {
         }
     }
 
-    public void commit(Runnable onFlush) {
+    public void commit(final Runnable onFlush) {
         try {
 
             boolean flush = false;
             if (atomicUpdate != null) {
                 store.store(atomicUpdate, onFlush, tx);
             } else if (updateCount > 1) {
-                store.journal.write(HawtDBManager.END_UNIT_OF_WORK_DATA, onFlush);
+                store.journal.write(HawtDBManager.END_UNIT_OF_WORK_DATA, new JournalCallback(){
+                    public void success(Location location) {
+                        if( onFlush!=null ) {
+                            onFlush.run();
+                        }
+                    }
+                });
             } else {
                 flush = onFlush != null;
             }
@@ -119,7 +127,7 @@ class HawtDBSession {
         }
     }
 
-    private void addUpdate(Data.Type.TypeCreatable bean) {
+    private void addUpdate(Type.TypeCreatable bean) {
         try {
             //As soon as we do more than one update we'll wrap in a unit of
             //work:
@@ -146,10 +154,10 @@ class HawtDBSession {
         if (message.key < 0) {
             throw new IllegalArgumentException("Key not set");
         }
-        Data.MessageAdd.MessageAddBean bean = new Data.MessageAdd.MessageAddBean();
+        AddMessage.Bean bean = new AddMessage.Bean();
         bean.setMessageKey(message.key);
         bean.setProtocol(message.protocol);
-        bean.setMessageSize(message.size);
+        bean.setSize(message.size);
         Buffer buffer = message.value;
         if (buffer != null) {
             bean.setValue(buffer);
@@ -169,11 +177,11 @@ class HawtDBSession {
             throw new KeyNotFoundException("message key: " + key);
         }
         try {
-            Data.MessageAdd bean = (Data.MessageAdd) store.load(location);
+            AddMessage.Bean bean = (AddMessage.Bean) store.load(location);
             MessageRecord rc = new MessageRecord();
             rc.key = bean.getMessageKey();
             rc.protocol = bean.getProtocol();
-            rc.size = bean.getMessageSize();
+            rc.size = bean.getSize();
             if (bean.hasValue()) {
                 rc.value = bean.getValue();
             }
@@ -190,7 +198,7 @@ class HawtDBSession {
     // Queue related methods.
     // /////////////////////////////////////////////////////////////
     public void queueAdd(QueueRecord record) {
-        Data.QueueAdd.QueueAddBean update = new Data.QueueAdd.QueueAddBean();
+        AddQueue.Bean update = new AddQueue.Bean();
         update.setName(record.name);
         update.setQueueType(record.queueType);
 //        AsciiBuffer parent = record.getParent();
@@ -202,7 +210,7 @@ class HawtDBSession {
     }
 
     public void queueRemove(QueueRecord record) {
-        addUpdate(new Data.QueueRemove.QueueRemoveBean().setKey(record.key));
+        addUpdate(new RemoveQueue.Bean().setKey(record.key));
     }
 
     public Iterator<QueueStatus> queueListByType(AsciiBuffer type, QueueRecord firstQueue, int max) {
@@ -224,11 +232,11 @@ class HawtDBSession {
     }
 
     public void queueAddMessage(QueueRecord queue, QueueEntryRecord entryRecord) throws KeyNotFoundException {
-        Data.QueueAddMessage.QueueAddMessageBean bean = new Data.QueueAddMessage.QueueAddMessageBean();
+        AddQueueEntry.Bean bean = new AddQueueEntry.Bean();
         bean.setQueueKey(queue.key);
         bean.setQueueKey(entryRecord.queueKey);
         bean.setMessageKey(entryRecord.messageKey);
-        bean.setMessageSize(entryRecord.size);
+        bean.setSize(entryRecord.size);
         if (entryRecord.attachment != null) {
             bean.setAttachment(entryRecord.attachment);
         }
@@ -236,9 +244,9 @@ class HawtDBSession {
     }
 
     public void queueRemoveMessage(QueueRecord queue, Long queueKey) throws KeyNotFoundException {
-        Data.QueueRemoveMessage.QueueRemoveMessageBean bean = new Data.QueueRemoveMessage.QueueRemoveMessageBean();
-        bean.setQueueKey(queueKey);
-        bean.setQueueName(queue.name);
+        RemoveQueueEntry.Bean bean = new RemoveQueueEntry.Bean();
+        bean.setQueueKey(queue.key);
+        bean.setQueueSeq(queueKey);
         addUpdate(bean);
     }
 
@@ -286,7 +294,7 @@ class HawtDBSession {
      * exist then it will simply be added.
      */
     public void updateSubscription(SubscriptionRecord record) {
-        Data.SubscriptionAdd.SubscriptionAddBean update = new Data.SubscriptionAdd.SubscriptionAddBean();
+        AddSubscription.Bean update = new AddSubscription.Bean();
         update.setName(record.name);
         update.setDestination(record.destination);
         update.setDurable(record.isDurable);
@@ -307,7 +315,7 @@ class HawtDBSession {
      * Removes a subscription with the given name from the store.
      */
     public void removeSubscription(AsciiBuffer name) {
-        Data.SubscriptionRemove.SubscriptionRemoveBean update = new Data.SubscriptionRemove.SubscriptionRemoveBean();
+        RemoveSubscription.Bean update = new RemoveSubscription.Bean();
         update.setName(name);
         addUpdate(update);
     }
@@ -328,13 +336,13 @@ class HawtDBSession {
     // Map related methods.
     // /////////////////////////////////////////////////////////////
     public void mapAdd(AsciiBuffer map) {
-        Data.MapAdd.MapAddBean update = new Data.MapAdd.MapAddBean();
+        AddMap.Bean update = new AddMap.Bean();
         update.setMapName(map);
         addUpdate(update);
     }
 
     public void mapRemove(AsciiBuffer map) {
-        Data.MapRemove.MapRemoveBean update = new Data.MapRemove.MapRemoveBean();
+        RemoveMap.Bean update = new RemoveMap.Bean();
         update.setMapName(map);
         addUpdate(update);
     }
@@ -345,7 +353,7 @@ class HawtDBSession {
     }
 
     public void mapEntryPut(AsciiBuffer map, AsciiBuffer key, Buffer value) {
-        Data.MapEntryPut.MapEntryPutBean update = new Data.MapEntryPut.MapEntryPutBean();
+        PutMapEntry.Bean update = new PutMapEntry.Bean();
         update.setMapName(map);
         update.setId(key);
         update.setValue(value);
@@ -362,7 +370,7 @@ class HawtDBSession {
     }
 
     public void mapEntryRemove(AsciiBuffer map, AsciiBuffer key) throws KeyNotFoundException {
-        Data.MapEntryRemove.MapEntryRemoveBean update = new Data.MapEntryRemove.MapEntryRemoveBean();
+        RemoveMapEntry.Bean update = new RemoveMapEntry.Bean();
         update.setMapName(map);
         update.setId(key);
         addUpdate(update);

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/MessageKeys.java Wed Jul  7 04:06:30 2010
@@ -21,10 +21,10 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.fusesource.hawtbuf.AsciiBuffer;
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
 import org.fusesource.hawtdb.internal.journal.Location;
-import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
+import org.fusesource.hawtdb.internal.journal.LocationCodec;
 
 public class MessageKeys {
 
@@ -41,16 +41,16 @@ public class MessageKeys {
         return "["+messageId+","+location+"]";
     }
     
-    public static final Marshaller<MessageKeys> MARSHALLER = new VariableMarshaller<MessageKeys>() {
-        public MessageKeys readPayload(DataInput dataIn) throws IOException {
-            Location location = LocationMarshaller.INSTANCE.readPayload(dataIn);
+    public static final Codec<MessageKeys> CODEC = new VariableCodec<MessageKeys>() {
+        public MessageKeys decode(DataInput dataIn) throws IOException {
+            Location location = LocationCodec.INSTANCE.decode(dataIn);
             byte data[] = new byte[dataIn.readShort()];
             dataIn.readFully(data);
             return new MessageKeys(new AsciiBuffer(data), location);
         }
 
-        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
-            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+        public void encode(MessageKeys object, DataOutput dataOut) throws IOException {
+            LocationCodec.INSTANCE.encode(object.location, dataOut);
             dataOut.writeShort(object.messageId.length);
             dataOut.write(object.messageId.data, object.messageId.offset, object.messageId.length);
         }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-hawtdb/src/main/scala/org/apache/activemq/broker/store/hawtdb/store/RootEntity.java Wed Jul  7 04:06:30 2010
@@ -19,17 +19,17 @@ package org.apache.activemq.broker.store
 import org.apache.activemq.apollo.store.QueueRecord;
 import org.apache.activemq.apollo.store.QueueStatus;
 import org.apache.activemq.apollo.store.SubscriptionRecord;
-import org.apache.activemq.broker.store.hawtdb.store.Data.MessageAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd;
-import org.apache.activemq.broker.store.hawtdb.store.Data.SubscriptionAdd.SubscriptionAddBuffer;
+import org.apache.activemq.broker.store.hawtdb.Codecs;
+import org.apache.activemq.broker.store.hawtdb.model.*;
 import org.fusesource.hawtbuf.AsciiBuffer;
 import org.fusesource.hawtbuf.Buffer;
+import org.fusesource.hawtbuf.codec.IntegerCodec;
+import org.fusesource.hawtbuf.codec.LongCodec;
 import org.fusesource.hawtbuf.proto.InvalidProtocolBufferException;
 import org.fusesource.hawtdb.api.*;
 import org.fusesource.hawtdb.internal.journal.Location;
-import org.fusesource.hawtdb.util.marshaller.IntegerMarshaller;
-import org.fusesource.hawtdb.util.marshaller.LocationMarshaller;
-import org.fusesource.hawtdb.util.marshaller.LongMarshaller;
+import org.fusesource.hawtdb.internal.journal.LocationCodec;
+
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -53,28 +53,28 @@ public class RootEntity {
     private static final BTreeIndexFactory<AsciiBuffer, Buffer> mapInstanceIndexFactory = new BTreeIndexFactory<AsciiBuffer, Buffer>();
 
     static {
-        messageKeyIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        messageKeyIndexFactory.setValueMarshaller(LocationMarshaller.INSTANCE);
+        messageKeyIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        messageKeyIndexFactory.setValueCodec(LocationCodec.INSTANCE);
         messageKeyIndexFactory.setDeferredEncoding(true);
 
-        locationIndexFactory.setKeyMarshaller(IntegerMarshaller.INSTANCE);
-        locationIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        locationIndexFactory.setKeyCodec(IntegerCodec.INSTANCE);
+        locationIndexFactory.setValueCodec(LongCodec.INSTANCE);
         locationIndexFactory.setDeferredEncoding(true);
 
-        messageRefsIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        messageRefsIndexFactory.setValueMarshaller(LongMarshaller.INSTANCE);
+        messageRefsIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        messageRefsIndexFactory.setValueCodec(LongCodec.INSTANCE);
         messageRefsIndexFactory.setDeferredEncoding(true);
 
-        destinationIndexFactory.setKeyMarshaller(LongMarshaller.INSTANCE);
-        destinationIndexFactory.setValueMarshaller(DestinationEntity.MARSHALLER);
+        destinationIndexFactory.setKeyCodec(LongCodec.INSTANCE);
+        destinationIndexFactory.setValueCodec(DestinationEntity.CODEC);
         destinationIndexFactory.setDeferredEncoding(true);
 
-        subscriptionIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        subscriptionIndexFactory.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+        subscriptionIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+        subscriptionIndexFactory.setValueCodec(Codecs.BUFFER_CODEC);
         subscriptionIndexFactory.setDeferredEncoding(true);
 
-        mapIndexFactory.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
-        mapIndexFactory.setValueMarshaller(IntegerMarshaller.INSTANCE);
+        mapIndexFactory.setKeyCodec(Codecs.ASCII_BUFFER_CODEC);
+        mapIndexFactory.setValueCodec(IntegerCodec.INSTANCE);
         mapIndexFactory.setDeferredEncoding(true);
     }
 
@@ -133,7 +133,7 @@ public class RootEntity {
             os.writeInt(object.mapIndex.getPage());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
-                LocationMarshaller.INSTANCE.writePayload(object.lastUpdate, os);
+                LocationCodec.INSTANCE.encode(object.lastUpdate, os);
             } else {
                 os.writeBoolean(false);
             }
@@ -153,7 +153,7 @@ public class RootEntity {
             rc.subscriptionIndex = subscriptionIndexFactory.open(paged, is.readInt());
             rc.mapIndex = mapIndexFactory.open(paged, is.readInt());
             if (is.readBoolean()) {
-                rc.lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
+                rc.lastUpdate = LocationCodec.INSTANCE.decode(is);
             } else {
                 rc.lastUpdate = null;
             }
@@ -202,7 +202,7 @@ public class RootEntity {
         return maxMessageKey;
     }
 
-    public void messageAdd(Transaction tx, MessageAdd command, Location location) throws IOException {
+    public void messageAdd(Transaction tx, AddMessage.Getter command, Location location) throws IOException {
         long id = command.getMessageKey();
         if (id > maxMessageKey) {
             maxMessageKey = id;
@@ -324,7 +324,7 @@ public class RootEntity {
     /**
      * @throws IOException
      */
-    public void addSubscription(SubscriptionAdd subscription) throws IOException {
+    public void addSubscription(AddSubscription.Bean subscription) throws IOException {
         data.subscriptionIndex.put(subscription.getName(), subscription.freeze().toFramedBuffer());
     }
 
@@ -352,7 +352,7 @@ public class RootEntity {
 
         SubscriptionRecord rc = null;
         if (b != null) {
-            SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
+            AddSubscription.Buffer sab = AddSubscription.FACTORY.parseFramed(b);
             if (sab != null) {
                 rc = new SubscriptionRecord();
                 rc.name = sab.getName();

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/Hasher.java Wed Jul  7 04:06:30 2010
@@ -17,10 +17,10 @@
  */
 package org.apache.activemq.util;
 
-import org.apache.activemq.util.marshaller.Marshaller;
-import org.apache.activemq.util.marshaller.VariableMarshaller;
 import org.fusesource.hawtbuf.Buffer;
 import org.fusesource.hawtbuf.DataByteArrayOutputStream;
+import org.fusesource.hawtbuf.codec.Codec;
+import org.fusesource.hawtbuf.codec.VariableCodec;
 
 import java.io.IOException;
 import java.io.DataOutput;
@@ -77,20 +77,20 @@ public interface Hasher<N, K> {
      * @param <K>
      */
     public class BinaryHasher<N, K> implements Hasher<N, K> {
-        private final Marshaller<N> nodeMarshaller;
-        private final Marshaller<K> keyMarshaller;
+        private final Codec<N> nodeCodec;
+        private final Codec<K> keyCodec;
         private final HashAlgorithim hashAlgorithim;
 
-        public BinaryHasher(Marshaller<N> nodeMarshaller, Marshaller<K> keyMarshaller, HashAlgorithim hashAlgorithim) {
-            this.nodeMarshaller = nodeMarshaller;
-            this.keyMarshaller = keyMarshaller;
+        public BinaryHasher(Codec<N> nodeCodec, Codec<K> keyCodec, HashAlgorithim hashAlgorithim) {
+            this.nodeCodec = nodeCodec;
+            this.keyCodec = keyCodec;
             this.hashAlgorithim = hashAlgorithim;
         }
 
         public int hashNode(N node, int i) {
             try {
                 DataByteArrayOutputStream os = new DataByteArrayOutputStream();
-                nodeMarshaller.writePayload(node, os);
+                nodeCodec.encode(node, os);
                 os.write(':');
                 os.writeInt(i);
                 return hash(os.toBuffer());
@@ -102,7 +102,7 @@ public interface Hasher<N, K> {
         public int hashKey(K value) {
             try {
                 DataByteArrayOutputStream os = new DataByteArrayOutputStream();
-                keyMarshaller.writePayload(value, os);
+                keyCodec.encode(value, os);
                 return hash(os.toBuffer());
             } catch (IOException e) {
                 throw new RuntimeException(e);
@@ -352,12 +352,12 @@ public interface Hasher<N, K> {
      * Used to convert an object to a byte[] by basically doing:
      * Object.toString().getBytes("UTF-8")
      */
-    public class ToStringMarshaller extends VariableMarshaller {
-        public void writePayload(Object o, DataOutput dataOutput) throws IOException {
+    public class ToStringCodec extends VariableCodec {
+        public void encode(Object o, DataOutput dataOutput) throws IOException {
             dataOutput.write(o.toString().getBytes("UTF-8"));
         }
 
-        public Object readPayload(DataInput dataInput) throws IOException {
+        public Object decode(DataInput dataInput) throws IOException {
             throw new UnsupportedOperationException();
         }
 
@@ -381,7 +381,7 @@ public interface Hasher<N, K> {
         }
 
         public ToStringHasher(HashAlgorithim hashAlgorithim) {
-            super(new ToStringMarshaller(), new ToStringMarshaller(), hashAlgorithim);
+            super(new ToStringCodec(), new ToStringCodec(), hashAlgorithim);
         }
 
         @Override

Modified: activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-util/src/main/scala/org/apache/activemq/util/list/SequenceSet.java Wed Jul  7 04:06:30 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.util.list;
 
+import org.fusesource.hawtbuf.codec.Codec;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -23,8 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
 
-import org.apache.activemq.util.marshaller.Marshaller;
-
 /**
  * Keeps track of a added long values. Collapses ranges of numbers using a
  * Sequence representation. Use to keep track of received message ids to find
@@ -34,11 +34,9 @@ import org.apache.activemq.util.marshall
  */
 public class SequenceSet extends LinkedNodeList<Sequence> {
     
-    public static class Marshaller implements org.apache.activemq.util.marshaller.Marshaller<SequenceSet> {
+    public static Codec CODEC = new Codec<SequenceSet>() {
 
-        public static final Marshaller INSTANCE = new Marshaller();
-        
-        public SequenceSet readPayload(DataInput in) throws IOException {
+        public SequenceSet decode(DataInput in) throws IOException {
             SequenceSet value = new SequenceSet();
             int count = in.readInt();
             for (int i = 0; i < count; i++) {
@@ -53,7 +51,7 @@ public class SequenceSet extends LinkedN
             return value;
         }
 
-        public void writePayload(SequenceSet value, DataOutput out) throws IOException {
+        public void encode(SequenceSet value, DataOutput out) throws IOException {
             out.writeInt(value.size());
             Sequence sequence = value.getHead();
             while (sequence != null ) {
@@ -90,7 +88,7 @@ public class SequenceSet extends LinkedN
         public int estimatedSize(SequenceSet object) {
             return object.size()*16;
         }
-    }
+    };
     
     public void add(Sequence value) {
         // TODO we can probably optimize this a bit

Modified: activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-web/src/main/scala/org/apache/activemq/apollo/ConfigStore.scala Wed Jul  7 04:06:30 2010
@@ -206,7 +206,7 @@ class FileConfigStore extends ConfigStor
 
 
   private def defaultConfig(rev:Int) = {
-    val config = Broker.default
+    val config = Broker.defaultConfig
     config.rev = rev
     config
   }

Modified: activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page?rev=961127&r1=961126&r2=961127&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page (original)
+++ activemq/sandbox/activemq-apollo-actor/webgen/src/network-design.page Wed Jul  7 04:06:30 2010
@@ -85,7 +85,7 @@ sort_info: 2
     in the diagram to to the right illustrates the hash values that would map to the
     %%em Broker 2 
     node.  This algorithm has been implemented in the activemq-util modules as the
-    %a{:href=>"http://fisheye6.atlassian.com/browse/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/HashRing.java?r=HEAD"}
+    %a{:href=>"http://github.com/chirino/activemq-apollo/blob/master/activemq-util/src/main/scala/org/apache/activemq/util/HashRing.java#L47"}
       %em HashRing
     class.
   %p