You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 23:23:04 UTC

svn commit: r786668 - in /activemq/sandbox/activemq-flow: activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/ activemq-kaha/src/main/proto/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-store/src/main/ja...

Author: cmacnaug
Date: Fri Jun 19 21:23:03 2009
New Revision: 786668

URL: http://svn.apache.org/viewvc?rev=786668&view=rev
Log:
Adding Subscription support to the store

Modified:
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
    activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
    activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
    activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri Jun 19 21:23:03 2009
@@ -30,11 +30,15 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
 import org.apache.activemq.broker.store.kahadb.Data.QueueRemove;
 import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove;
 import org.apache.activemq.broker.store.kahadb.Data.Trace;
 import org.apache.activemq.broker.store.kahadb.Data.Type;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
@@ -42,6 +46,8 @@
 import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
 import org.apache.activemq.broker.store.kahadb.Data.QueueRemove.QueueRemoveBean;
 import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage.QueueRemoveMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove.SubscriptionRemoveBean;
 import org.apache.activemq.broker.store.kahadb.Data.Type.TypeCreatable;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
@@ -631,7 +637,12 @@
         case QUEUE_REMOVE_MESSAGE:
             queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
             break;
-
+        case SUBSCRIPTION_ADD:
+            rootEntity.addSubscription(tx, (SubscriptionAdd) command);
+            break;
+        case SUBSCRIPTION_REMOVE:
+            rootEntity.removeSubscription(tx, ((SubscriptionRemove) command).getName());
+            break;
         case TRANSACTION_BEGIN:
         case TRANSACTION_ADD_MESSAGE:
         case TRANSACTION_REMOVE_MESSAGE:
@@ -936,6 +947,75 @@
             }
         }
 
+        ////////////////////////////////////////////////////////////////
+        //Client related methods
+        ////////////////////////////////////////////////////////////////
+
+        /**
+         * Adds a subscription to the store.
+         * 
+         * @throws DuplicateKeyException
+         *             if a subscription with the same name already exists
+         * 
+         */
+        public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
+            storeAtomic();
+            SubscriptionRecord old;
+            try {
+                old = rootEntity.getSubscription(tx, record.getName());
+                if (old != null && !old.equals(record)) {
+                    throw new DuplicateKeyException("Subscription already exists: " + record.getName());
+                } else {
+                    updateSubscription(record);
+                }
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
+        }
+
+        /**
+         * Updates a subscription in the store. If the subscription does not
+         * exist then it will simply be added.
+         */
+        public void updateSubscription(SubscriptionRecord record) {
+            SubscriptionAddBean update = new SubscriptionAddBean();
+            update.setName(record.getName());
+            update.setDestination(record.getDestination());
+            update.setDurable(record.getIsDurable());
+            
+            if (record.getAttachment() != null) {
+                update.setAttachment(record.getAttachment());
+            }
+            if (record.getSelector() != null) {
+                update.setSelector(record.getSelector());
+            }
+            if (record.getTte() != -1) {
+                update.setTte(record.getTte());
+            }
+            addUpdate(update);
+        }
+
+        /**
+         * Removes a subscription with the given name from the store.
+         */
+        public void removeSubscription(AsciiBuffer name) {
+            SubscriptionRemoveBean update = new SubscriptionRemoveBean();
+            update.setName(name);
+            addUpdate(update);
+        }
+
+        /**
+         * @return A list of subscriptions
+         */
+        public Iterator<SubscriptionRecord> listSubscriptions() {
+            storeAtomic();
+            try {
+                return rootEntity.listSubsriptions(tx);
+            } catch (IOException e) {
+                throw new FatalStoreException(e);
+            }
+        }
+
         // /////////////////////////////////////////////////////////////
         // Map related methods.
         // /////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Fri Jun 19 21:23:03 2009
@@ -31,9 +31,15 @@
 import org.apache.activemq.broker.store.Store;
 import org.apache.activemq.broker.store.Store.KeyNotFoundException;
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
 import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBuffer;
 import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
 import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.Subscription;
 import org.apache.kahadb.index.BTreeIndex;
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Location;
@@ -44,21 +50,27 @@
 import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
+import com.sun.xml.internal.bind.v2.util.FatalAdapter;
+
 public class RootEntity {
 
     //TODO remove this one performance testing is complete. 
     private static final boolean USE_LOC_INDEX = true;
 
+    private static final int VERSION = 0;
+
     public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>() {
         public RootEntity readPayload(DataInput is) throws IOException {
             RootEntity rc = new RootEntity();
             rc.state = is.readInt();
+            is.readInt(); //VERSION 
             rc.maxMessageKey = is.readLong();
             rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
             if (USE_LOC_INDEX)
                 rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
             rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
             rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
+            rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
             if (is.readBoolean()) {
                 rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
             } else {
@@ -69,12 +81,14 @@
 
         public void writePayload(RootEntity object, DataOutput os) throws IOException {
             os.writeInt(object.state);
+            os.writeInt(VERSION);
             os.writeLong(object.maxMessageKey);
             os.writeLong(object.messageKeyIndex.getPageId());
             if (USE_LOC_INDEX)
                 os.writeLong(object.locationIndex.getPageId());
             os.writeLong(object.destinationIndex.getPageId());
             os.writeLong(object.messageRefsIndex.getPageId());
+            os.writeLong(object.subscriptionIndex.getPageId());
             if (object.lastUpdate != null) {
                 os.writeBoolean(true);
                 Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
@@ -104,6 +118,9 @@
     private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
     private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
 
+    // Subscriptions
+    private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
+
     // /////////////////////////////////////////////////////////////////
     // Lifecycle Methods.
     // /////////////////////////////////////////////////////////////////
@@ -121,7 +138,7 @@
             locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
         destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
         messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
-
+        subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
         page.set(this);
         tx.store(page, MARSHALLER, true);
     }
@@ -146,6 +163,11 @@
             locationIndex.load(tx);
         }
 
+        subscriptionIndex.setPageFile(tx.getPageFile());
+        subscriptionIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+        subscriptionIndex.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+        subscriptionIndex.load(tx);
+
         destinationIndex.setPageFile(tx.getPageFile());
         destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
         destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
@@ -325,6 +347,97 @@
     }
 
     // /////////////////////////////////////////////////////////////////
+    // Client Methods.
+    // /////////////////////////////////////////////////////////////////
+
+    /**
+     * Returns a list of all of the stored subscriptions. 
+     * @param tx The transaction under which this is to be executed.
+     * @return a list of all of the stored subscriptions. 
+     * @throws IOException 
+     */
+    public Iterator<SubscriptionRecord> listSubsriptions(Transaction tx) throws IOException {
+        
+        final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
+        
+        subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
+            public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
+                return true;
+            }
+
+            public void visit(List<AsciiBuffer> keys, List<Buffer> values) {
+                for (Buffer b : values) {
+                    try {
+                        rc.add(toSubscriptionRecord(b));
+                    } catch (InvalidProtocolBufferException e) {
+                        throw new Store.FatalStoreException(e);
+                    }
+                }
+            }
+        });
+        
+        return rc.iterator();
+    }
+
+    /**
+     * @param tx
+     * @param name
+     * @throws IOException
+     */
+    public void removeSubscription(Transaction tx, AsciiBuffer name) throws IOException {
+        subscriptionIndex.remove(tx, name);
+    }
+
+    /**
+     * @param tx
+     * @param name
+     * @throws IOException
+     */
+    public void addSubscription(Transaction tx, SubscriptionAdd subscription) throws IOException {
+        subscriptionIndex.put(tx, subscription.getName(), subscription.freeze().toFramedBuffer());
+    }
+
+    /**
+     * @param name
+     * @return
+     * @throws IOException
+     */
+    public SubscriptionRecord getSubscription(Transaction tx, AsciiBuffer name) throws IOException {
+        return toSubscriptionRecord(subscriptionIndex.get(tx, name));
+    }
+
+    /**
+     * Converts a Subscription buffer to a SubscriptionRecord.
+     * @param b The buffer
+     * @return The record.
+     * @throws InvalidProtocolBufferException
+     */
+    private static SubscriptionRecord toSubscriptionRecord(Buffer b) throws InvalidProtocolBufferException {
+        if (b == null) {
+            return null;
+        }
+        
+        SubscriptionRecord rc = null;
+        if (b != null) {
+            SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
+            if (sab != null) {
+                rc = new SubscriptionRecord();
+                rc.setName(sab.getName());
+                rc.setDestination(sab.getDestination());
+                rc.setIsDurable(sab.getDurable());
+                if (sab.hasAttachment())
+                    rc.setAttachment(sab.getAttachment());
+                if (sab.hasSelector())
+                    rc.setSelector(sab.getSelector());
+                if (sab.hasTte())
+                    rc.setTte(sab.getTte());
+
+            }
+        }
+        return rc;
+    }
+
+    // /////////////////////////////////////////////////////////////////
     // Queue Methods.
     // /////////////////////////////////////////////////////////////////
     public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
@@ -466,29 +579,28 @@
         //are past the last update location in the journal. This can happen
         //if the index is flushed before the journal. 
         int count = 0;
-        
+
         //TODO: It might be better to tie the the index update to the journal write
         //so that we can be sure that all journal entries are on disk prior to 
         //index update. 
 
         //Scan MessageKey Index to find message keys past the last append 
         //location:
-//        final ArrayList<Long> matches = new ArrayList<Long>();
-//        messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
-//
-//            @Override
-//            protected void matched(Location key, Long value) {
-//                matches.add(value);
-//            }
-//        });
-        
-        
-//        for (Long sequenceId : matches) {
-//        MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-//        sd.locationIndex.remove(tx, keys.location);
-//        sd.messageIdIndex.remove(tx, keys.messageId);
-//        count++;
-//        }
+        //        final ArrayList<Long> matches = new ArrayList<Long>();
+        //        messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+        //
+        //            @Override
+        //            protected void matched(Location key, Long value) {
+        //                matches.add(value);
+        //            }
+        //        });
+
+        //        for (Long sequenceId : matches) {
+        //        MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+        //        sd.locationIndex.remove(tx, keys.location);
+        //        sd.messageIdIndex.remove(tx, keys.messageId);
+        //        count++;
+        //        }
 
         //                 @Override
         //                 protected void matched(Location key, Long value) {
@@ -598,4 +710,5 @@
         });
 
     }
+
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto Fri Jun 19 21:23:03 2009
@@ -39,6 +39,8 @@
   STREAM_WRITE = 41;
   STREAM_CLOSE = 42;
   STREAM_REMOVE = 43;
+  SUBSCRIPTION_ADD = 50;
+  SUBSCRIPTION_REMOVE = 51;
   
   TRACE = 100;
 }
@@ -86,6 +88,25 @@
   optional int64 messageKey=2;
 }  
 
+
+///////////////////////////////////////////////////////////////
+// Client related operations.
+///////////////////////////////////////////////////////////////
+message SubscriptionAdd {
+  optional bytes name        = 1    [java_override_type = "AsciiBuffer"];
+  optional bytes selector    = 2    [java_override_type = "AsciiBuffer"];
+  optional bytes destination = 3    [java_override_type = "AsciiBuffer"];
+  optional bool  durable     = 4    [default = false];
+  optional int64 tte         = 5    [default = -1];
+  optional bytes attachment  = 6;
+  
+}
+
+message SubscriptionRemove {
+  optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+}
+
+
 ///////////////////////////////////////////////////////////////
 // Map related operations.
 ///////////////////////////////////////////////////////////////

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 21:23:03 2009
@@ -360,8 +360,15 @@
             } else {
                 connection.onException(e);
             }
-        } catch (Throwable e) {
-            connection.onException(new Exception(e));
+        }
+        catch (Throwable t) {
+            if (responseRequired) {
+                ExceptionResponse response = new ExceptionResponse(t);
+                response.setCorrelationId(commandId);
+                connection.write(response);
+            } else {
+                connection.onException(new RuntimeException(t));
+            }
         }
     }
 

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Fri Jun 19 21:23:03 2009
@@ -100,10 +100,9 @@
 
     /**
      * Gets the store's root directory;
-     */ 
+     */
     public File getStoreDirectory();
 
-    
     /**
      * Indicates that all messages should be deleted on startup
      * 
@@ -121,7 +120,7 @@
      * @return A new store Session.
      */
     public Session getSession();
-    
+
     /**
      * This interface is used to execute transacted code.
      * 
@@ -172,6 +171,146 @@
         }
     }
 
+    public static class SubscriptionRecord {
+
+        AsciiBuffer name;
+        AsciiBuffer selector;
+        AsciiBuffer destination;
+        boolean isDurable;
+        long tte = -1;
+        Buffer attachment;
+
+        /**
+         * @return the name.
+         */
+        public AsciiBuffer getName() {
+            return name;
+        }
+
+        /**
+         * @param name
+         *            the name to set
+         */
+        public void setName(AsciiBuffer name) {
+            this.name = name;
+        }
+
+        /**
+         * @return the selector
+         */
+        public AsciiBuffer getSelector() {
+            return selector;
+        }
+
+        /**
+         * @param selector
+         *            the selector to set.
+         */
+        public void setSelector(AsciiBuffer selector) {
+            this.selector = selector;
+        }
+
+        /**
+         * @return the destination
+         */
+        public AsciiBuffer getDestination() {
+            return destination;
+        }
+
+        /**
+         * @param destination
+         *            the destination to set
+         */
+        public void setDestination(AsciiBuffer destination) {
+            this.destination = destination;
+        }
+
+        /**
+         * @return the isDurable
+         */
+        public boolean getIsDurable() {
+            return isDurable;
+        }
+
+        /**
+         * @param isDurable
+         *            the isDurable to set
+         */
+        public void setIsDurable(boolean isDurable) {
+            this.isDurable = isDurable;
+        }
+
+        /**
+         * @return the tte
+         */
+        public long getTte() {
+            return tte;
+        }
+
+        /**
+         * @param tte
+         *            the tte to set
+         */
+        public void setTte(long tte) {
+            this.tte = tte;
+        }
+
+        /**
+         * @return the attachment
+         */
+        public Buffer getAttachment() {
+            return attachment;
+        }
+
+        /**
+         * @param attachment
+         *            the attachment to set
+         */
+        public void setAttachment(Buffer attachment) {
+            this.attachment = attachment;
+        }
+
+        public int hashCode() {
+            return name.hashCode();
+        }
+
+        public boolean equals(Object o) {
+            if (o instanceof SubscriptionRecord) {
+                return equals((SubscriptionRecord) o);
+            } else {
+                return false;
+            }
+        }
+
+        public boolean equals(SubscriptionRecord r) {
+            if (r == null) {
+                return false;
+            }
+            if (r.hashCode() != hashCode())
+                return false;
+
+            if (r.isDurable != isDurable)
+                return false;
+
+            if (r.tte != tte)
+                return false;
+
+            if (!name.equals(r.name))
+                return false;
+
+            if (!destination.equals(r.destination))
+                return false;
+            
+            if (!(selector == null ? r.selector == null : selector.equals(r.selector))) 
+                return false;
+            
+            if (!(attachment == null ? r.attachment == null : attachment.equals(r.attachment))) 
+                return false;
+            
+            return true;
+        }
+    }
+
     public static class QueueRecord {
         Long queueKey;
         Long messageKey;
@@ -179,7 +318,7 @@
         int size;
         boolean redelivered;
         long tte;
-        
+
         public boolean isRedelivered() {
             return redelivered;
         }
@@ -318,12 +457,12 @@
          * @return the first sequence number in the queue.
          */
         public long getFirstSequence();
-        
+
         /**
          * @return the last sequence number in the queue.
          */
         public long getLastSequence();
-        
+
         /**
          * @return The results for this queue's partitions
          */
@@ -359,12 +498,11 @@
      */
     public void flush();
 
-    
     /**
-     * @return true if the store is transactional. 
+     * @return true if the store is transactional.
      */
     public boolean isTransactional();
-    
+
     /**
      * This interface allows you to query and update the Store.
      * 
@@ -373,59 +511,69 @@
      * 
      */
     public interface Session {
-        
+
+        ////////////////////////////////////////////////////////////////
+        //Lock related methods:
+        ////////////////////////////////////////////////////////////////
+
         /**
          * Commits work done on the Session
          */
         public void commit();
 
         /**
-         * Rolls back work done on the Session
-         * since the last call to {@link #acquireLock()}
+         * Rolls back work done on the Session since the last call to
+         * {@link #acquireLock()}
          * 
-         * @throw {@link UnsupportedOperationException} if the store is not transactional
+         * @throw {@link UnsupportedOperationException} if the store is not
+         *        transactional
          */
         public void rollback();
 
         /**
-         * Indicates callers intent to start a transaction. 
+         * Indicates callers intent to start a transaction.
          */
         public void acquireLock();
 
         /**
-         * Indicates caller is done with the transaction, if 
-         * not committed then the transaction will be rolled back (providing
-         * the store is transactional.
+         * Indicates caller is done with the transaction, if not committed then
+         * the transaction will be rolled back (providing the store is
+         * transactional.
          */
         public void releaseLock();
 
-        public void messageAdd(MessageRecord message);
-
-        public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
-
-        public Long streamOpen();
-
-        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
-
-        public void streamClose(Long streamKey) throws KeyNotFoundException;
-
-        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+        ////////////////////////////////////////////////////////////////
+        //Client related methods
+        ////////////////////////////////////////////////////////////////
 
-        public boolean streamRemove(Long streamKey);
-
-        // Transaction related methods.
-        public Iterator<Buffer> transactionList(Buffer first, int max);
-
-        public void transactionAdd(Buffer txid);
-
-        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+        /**
+         * Adds a subscription to the store.
+         * 
+         * @throws DuplicateKeyException
+         *             if a subscription with the same name already exists
+         * 
+         */
+        public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException;
 
-        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
+        /**
+         * Updates a subscription in the store. If the subscription does not
+         * exist then it will simply be added.
+         */
+        public void updateSubscription(SubscriptionRecord record);
 
-        public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+        /**
+         * Removes a subscription with the given name from the store.
+         */
+        public void removeSubscription(AsciiBuffer name);
 
-        public void transactionRollback(Buffer txid) throws KeyNotFoundException;
+        /**
+         * @return A list of the stored subscriptions.
+         */
+        public Iterator<SubscriptionRecord> listSubscriptions();
 
+        ////////////////////////////////////////////////////////////////
+        //Queue related methods
+        ////////////////////////////////////////////////////////////////
         /**
          * Gets a list of queues. The returned iterator returns top-level queues
          * (e.g. queues without a parent). The child queues are accessible via
@@ -433,7 +581,7 @@
          * 
          * @param firstQueueName
          *            If null starts the query at the first queue.
-        * @param max
+         * @param max
          *            The maximum number of queues to return
          * @return The list of queues.
          */
@@ -496,6 +644,43 @@
 
         public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
 
+        ////////////////////////////////////////////////////////////////
+        //Message related methods
+        ////////////////////////////////////////////////////////////////
+        public void messageAdd(MessageRecord message);
+
+        public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
+
+        public Long streamOpen();
+
+        public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
+
+        public void streamClose(Long streamKey) throws KeyNotFoundException;
+
+        public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+
+        public boolean streamRemove(Long streamKey);
+
+        ////////////////////////////////////////////////////////////////
+        //Transaction related methods 
+        //TODO these will probably go away in favor of just using a queue.
+        ////////////////////////////////////////////////////////////////
+        public Iterator<Buffer> transactionList(Buffer first, int max);
+
+        public void transactionAdd(Buffer txid);
+
+        public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+
+        public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
+
+        public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+
+        public void transactionRollback(Buffer txid) throws KeyNotFoundException;
+
+        ////////////////////////////////////////////////////////////////
+        //Map related methods 
+        ////////////////////////////////////////////////////////////////
+
         public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
 
         public boolean mapAdd(AsciiBuffer map);

Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Jun 19 21:23:03 2009
@@ -28,7 +28,9 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
 import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.queue.QueueDescriptor;
@@ -219,12 +221,11 @@
         }
 
     }
-    
+
     /**
      * @return A new store Session.
      */
-    public Session getSession()
-    {
+    public Session getSession() {
         return session;
     }
 
@@ -317,41 +318,40 @@
         private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
         private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
 
+        private HashMap<AsciiBuffer, SubscriptionRecord> subscriptions = new HashMap<AsciiBuffer, SubscriptionRecord>();
+
         /**
-         * Commits work done on the Session, if {@link Store#isTransactional()} is true.
+         * Commits work done on the Session, if {@link Store#isTransactional()}
+         * is true.
          */
-        public void commit()
-        {
+        public void commit() {
             //NOOP
         }
 
         /**
-         * Rolls back work done on the Session
-         * since the last call to {@link #acquireLock()}
+         * Rolls back work done on the Session since the last call to
+         * {@link #acquireLock()}
          */
-        public void rollback()
-        {
+        public void rollback() {
             throw new UnsupportedOperationException();
         }
 
         /**
-         * Indicates callers intent to start a transaction. If the store
-         * is transaction, the caller must call {@link #commit()} when the 
-         * done operating on the Session prior to a mandatory call to 
+         * Indicates callers intent to start a transaction. If the store is
+         * transaction, the caller must call {@link #commit()} when the done
+         * operating on the Session prior to a mandatory call to
          * {@link #releaseLock()}
          */
-        public void acquireLock()
-        {
+        public void acquireLock() {
             updateLock.lock();
         }
 
         /**
-         * Indicates caller is done with the transaction, if 
-         * not committed then the transaction will be rolled back (providing
-         * the store is transactional.
+         * Indicates caller is done with the transaction, if not committed then
+         * the transaction will be rolled back (providing the store is
+         * transactional.
          */
-        public void releaseLock()
-        {
+        public void releaseLock() {
             updateLock.unlock();
         }
 
@@ -382,6 +382,50 @@
             return null;
         }
 
+        ////////////////////////////////////////////////////////////////
+        //Client related methods
+        ////////////////////////////////////////////////////////////////
+
+        /**
+         * Adds a subscription to the store.
+         * 
+         * @throws DuplicateKeyException
+         *             if a subscription with the same name already exists
+         * 
+         */
+        public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
+            SubscriptionRecord old = subscriptions.put(record.getName(), record);
+            if (old != null && !old.equals(record)) {
+                subscriptions.put(old.getName(), old);
+                throw new DuplicateKeyException(record.getName() + " already exists!");
+            }
+        }
+
+        /**
+         * Updates a subscription in the store. If the subscription does not
+         * exist then it will simply be added.
+         */
+        public void updateSubscription(SubscriptionRecord record) {
+            subscriptions.put(record.getName(), record);
+        }
+
+        /**
+         * Removes a subscription with the given name from the store.
+         */
+        public void removeSubscription(AsciiBuffer name) {
+            subscriptions.remove(name);
+        }
+        
+        /**
+         * @return A list of subscriptions 
+         */
+        public Iterator<SubscriptionRecord> listSubscriptions()
+        {
+            ArrayList<SubscriptionRecord> rc = new ArrayList<SubscriptionRecord>(subscriptions.size());
+            rc.addAll(subscriptions.values());
+            return rc.iterator();
+        }
+
         // //////////////////////////////////////////////////////////////////////////////
         // Queue related methods.
         // ///////////////////////////////////////////////////////////////////////////////
@@ -644,13 +688,12 @@
         // NOOP
     }
 
-	public File getStoreDirectory() {
-		return null;
-	}
+    public File getStoreDirectory() {
+        return null;
+    }
 
-	public void setDeleteAllMessages(boolean val) {
+    public void setDeleteAllMessages(boolean val) {
         // NOOP
     }
 
-
 }

Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Fri Jun 19 21:23:03 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.store;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
@@ -26,6 +27,7 @@
 import org.apache.activemq.broker.store.Store.QueueQueryResult;
 import org.apache.activemq.broker.store.Store.QueueRecord;
 import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
 import org.apache.activemq.broker.store.Store.VoidCallback;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
@@ -61,7 +63,7 @@
         expected.setMessageId(new AsciiBuffer("1000"));
         expected.setKey(store.allocateStoreTracking());
         expected.setSize(expected.getBuffer().getLength());
-        
+
         store.execute(new VoidCallback<Exception>() {
             public void run(Session session) throws Exception {
                 session.messageAdd(expected);
@@ -80,8 +82,8 @@
     public void testQueueAdd() throws Exception {
         final QueueDescriptor expected = new QueueDescriptor();
         expected.setQueueName(new AsciiBuffer("testQueue"));
-        expected.setApplicationType((short)1);
-        
+        expected.setApplicationType((short) 1);
+
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -91,36 +93,35 @@
 
         //Test that the queue was created:
         checkQueue(expected, 0, 0);
-        
-        if(isStorePersistent())
-        {
+
+        if (isStorePersistent()) {
             //Restart the store and make sure the queue is still there
             store.stop();
             store = createStore(false);
             store.start();
-            
+
             //Test that the queue was persisted
             checkQueue(expected, 0, 0);
         }
     }
-    
+
     public void testQueueMessageAdd() throws Exception {
         final QueueDescriptor queue = new QueueDescriptor();
         queue.setQueueName(new AsciiBuffer("testQueue"));
-        queue.setApplicationType((short)1);
-        
+        queue.setApplicationType((short) 1);
+
         final MessageRecord message = new MessageRecord();
         message.setBuffer(new Buffer("buffer"));
         message.setEncoding(new AsciiBuffer("encoding"));
         message.setMessageId(new AsciiBuffer("1000"));
         message.setKey(store.allocateStoreTracking());
         message.setSize(message.getBuffer().getLength());
-        
+
         final QueueRecord qRecord = new QueueRecord();
         qRecord.setMessageKey(message.getKey());
         qRecord.setQueueKey(1L);
         qRecord.setSize(message.getSize());
-        
+
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -132,22 +133,97 @@
 
         checkQueue(queue, message.getSize(), 1);
         checkMessageRestore(queue, qRecord, message);
-        
+
         //Restart the store and make sure the queue is still there
-        if(isStorePersistent())
-        {
+        if (isStorePersistent()) {
             store.stop();
             store = createStore(false);
             store.start();
-            
+
             //Test that the queue was persisted
             checkQueue(queue, message.getSize(), 1);
             checkMessageRestore(queue, qRecord, message);
         }
     }
 
-    private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception
+    public void testSubscriptions() throws Exception {
+        HashMap<AsciiBuffer, SubscriptionRecord> expected = new HashMap<AsciiBuffer, SubscriptionRecord>();
+        
+        final SubscriptionRecord record1 = new SubscriptionRecord();
+        record1.setName(new AsciiBuffer("sub1"));
+        record1.setIsDurable(true);
+        record1.setDestination(new AsciiBuffer("topic1"));
+        expected.put(record1.getName(), record1);
+        
+        final SubscriptionRecord record2 = new SubscriptionRecord();
+        record2.setName(new AsciiBuffer("sub2"));
+        record2.setIsDurable(false);
+        record2.setDestination(new AsciiBuffer("topic2"));
+        record2.setTte(System.currentTimeMillis() + 40000);
+        record2.setSelector(new AsciiBuffer("foo"));
+        byte[] attachment2 = new byte[1024];
+        for (int i = 0; i < attachment2.length; i++) {
+            attachment2[i] = (byte) i;
+        }
+        record2.setAttachment(new Buffer(attachment2));
+        expected.put(record2.getName(), record2);
+
+        //They make it?
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.addSubscription(record1);
+                session.addSubscription(record2);
+            }
+        }, null);
+        
+        checkSubscriptions(expected);
+        
+        //Let's remove one:
+        expected.remove(record1.getName());
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                session.removeSubscription(record1.getName());
+            }
+        }, null);
+        
+        checkSubscriptions(expected);
+        
+        //Restart the store and make sure the queue is still there
+        if (isStorePersistent()) {
+            store.stop();
+            store = createStore(false);
+            store.start();
+
+            //Test that the queue was persisted
+            checkSubscriptions(expected);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
     {
+        final HashMap<AsciiBuffer, SubscriptionRecord> checkMap = (HashMap<AsciiBuffer, SubscriptionRecord>) expected.clone();
+        
+        store.execute(new VoidCallback<Exception>() {
+            @Override
+            public void run(Session session) throws Exception {
+                Iterator<SubscriptionRecord> results = session.listSubscriptions();
+                while(results.hasNext())
+                {
+                    SubscriptionRecord r = results.next();
+                    SubscriptionRecord e = checkMap.remove(r.getName());
+                    assertEquals(r, e);
+                }
+                
+                //Shouldn't be any expected results left:
+                assertEquals(0, checkMap.size());
+            }
+        }, null);
+    }
+
+    private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception {
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -160,9 +236,8 @@
             }
         }, null);
     }
-    
-    private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message ) throws FatalStoreException, Exception
-    {
+
+    private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message) throws FatalStoreException, Exception {
         store.execute(new VoidCallback<Exception>() {
             @Override
             public void run(Session session) throws Exception {
@@ -176,7 +251,7 @@
             }
         }, null);
     }
-    
+
     public void testStoreExecuteExceptionPassthrough() throws Exception {
         try {
             store.execute(new VoidCallback<Exception>() {
@@ -213,7 +288,7 @@
         assertEquals(expected.getStreamKey(), actual.getStreamKey());
         assertEquals(expected.getSize(), actual.getSize());
     }
-    
+
     static void assertEquals(QueueDescriptor expected, QueueDescriptor actual) {
         assertEquals(expected.getParent(), actual.getParent());
         assertEquals(expected.getQueueType(), actual.getQueueType());
@@ -221,7 +296,7 @@
         assertEquals(expected.getPartitionKey(), actual.getPartitionKey());
         assertEquals(expected.getQueueName(), actual.getQueueName());
         //TODO test partitions?
-        
+
     }
 
 }