You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cm...@apache.org on 2009/06/19 23:23:04 UTC
svn commit: r786668 - in /activemq/sandbox/activemq-flow:
activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/
activemq-kaha/src/main/proto/
activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/
activemq-store/src/main/ja...
Author: cmacnaug
Date: Fri Jun 19 21:23:03 2009
New Revision: 786668
URL: http://svn.apache.org/viewvc?rev=786668&view=rev
Log:
Adding Subscription support to the store
Modified:
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/KahaDBStore.java Fri Jun 19 21:23:03 2009
@@ -30,11 +30,15 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAdd;
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage;
import org.apache.activemq.broker.store.kahadb.Data.QueueRemove;
import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove;
import org.apache.activemq.broker.store.kahadb.Data.Trace;
import org.apache.activemq.broker.store.kahadb.Data.Type;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd.MessageAddBean;
@@ -42,6 +46,8 @@
import org.apache.activemq.broker.store.kahadb.Data.QueueAddMessage.QueueAddMessageBean;
import org.apache.activemq.broker.store.kahadb.Data.QueueRemove.QueueRemoveBean;
import org.apache.activemq.broker.store.kahadb.Data.QueueRemoveMessage.QueueRemoveMessageBean;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBean;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionRemove.SubscriptionRemoveBean;
import org.apache.activemq.broker.store.kahadb.Data.Type.TypeCreatable;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
@@ -631,7 +637,12 @@
case QUEUE_REMOVE_MESSAGE:
queueRemoveMessage(tx, (QueueRemoveMessage) command, location);
break;
-
+ case SUBSCRIPTION_ADD:
+ rootEntity.addSubscription(tx, (SubscriptionAdd) command);
+ break;
+ case SUBSCRIPTION_REMOVE:
+ rootEntity.removeSubscription(tx, ((SubscriptionRemove) command).getName());
+ break;
case TRANSACTION_BEGIN:
case TRANSACTION_ADD_MESSAGE:
case TRANSACTION_REMOVE_MESSAGE:
@@ -936,6 +947,75 @@
}
}
+ ////////////////////////////////////////////////////////////////
+ //Client related methods
+ ////////////////////////////////////////////////////////////////
+
+ /**
+ * Adds a subscription to the store.
+ *
+ * @throws DuplicateKeyException
+ * if a subscription with the same name already exists
+ *
+ */
+ public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
+ storeAtomic();
+ SubscriptionRecord old;
+ try {
+ old = rootEntity.getSubscription(tx, record.getName());
+ if (old != null && !old.equals(record)) {
+ throw new DuplicateKeyException("Subscription already exists: " + record.getName());
+ } else {
+ updateSubscription(record);
+ }
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
+ }
+
+ /**
+ * Updates a subscription in the store. If the subscription does not
+ * exist then it will simply be added.
+ */
+ public void updateSubscription(SubscriptionRecord record) {
+ SubscriptionAddBean update = new SubscriptionAddBean();
+ update.setName(record.getName());
+ update.setDestination(record.getDestination());
+ update.setDurable(record.getIsDurable());
+
+ if (record.getAttachment() != null) {
+ update.setAttachment(record.getAttachment());
+ }
+ if (record.getSelector() != null) {
+ update.setSelector(record.getSelector());
+ }
+ if (record.getTte() != -1) {
+ update.setTte(record.getTte());
+ }
+ addUpdate(update);
+ }
+
+ /**
+ * Removes a subscription with the given name from the store.
+ */
+ public void removeSubscription(AsciiBuffer name) {
+ SubscriptionRemoveBean update = new SubscriptionRemoveBean();
+ update.setName(name);
+ addUpdate(update);
+ }
+
+ /**
+ * @return A list of subscriptions
+ */
+ public Iterator<SubscriptionRecord> listSubscriptions() {
+ storeAtomic();
+ try {
+ return rootEntity.listSubsriptions(tx);
+ } catch (IOException e) {
+ throw new FatalStoreException(e);
+ }
+ }
+
// /////////////////////////////////////////////////////////////
// Map related methods.
// /////////////////////////////////////////////////////////////
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/java/org/apache/activemq/broker/store/kahadb/RootEntity.java Fri Jun 19 21:23:03 2009
@@ -31,9 +31,15 @@
import org.apache.activemq.broker.store.Store;
import org.apache.activemq.broker.store.Store.KeyNotFoundException;
import org.apache.activemq.broker.store.Store.QueueQueryResult;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
import org.apache.activemq.broker.store.kahadb.Data.MessageAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd;
+import org.apache.activemq.broker.store.kahadb.Data.SubscriptionAdd.SubscriptionAddBuffer;
import org.apache.activemq.protobuf.AsciiBuffer;
+import org.apache.activemq.protobuf.Buffer;
+import org.apache.activemq.protobuf.InvalidProtocolBufferException;
import org.apache.activemq.queue.QueueDescriptor;
+import org.apache.activemq.queue.Subscription;
import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Location;
@@ -44,21 +50,27 @@
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.VariableMarshaller;
+import com.sun.xml.internal.bind.v2.util.FatalAdapter;
+
public class RootEntity {
//TODO remove this one performance testing is complete.
private static final boolean USE_LOC_INDEX = true;
+ private static final int VERSION = 0;
+
public final static Marshaller<RootEntity> MARSHALLER = new VariableMarshaller<RootEntity>() {
public RootEntity readPayload(DataInput is) throws IOException {
RootEntity rc = new RootEntity();
rc.state = is.readInt();
+ is.readInt(); //VERSION
rc.maxMessageKey = is.readLong();
rc.messageKeyIndex = new BTreeIndex<Long, Location>(is.readLong());
if (USE_LOC_INDEX)
rc.locationIndex = new BTreeIndex<Integer, Long>(is.readLong());
rc.destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(is.readLong());
rc.messageRefsIndex = new BTreeIndex<Long, Long>(is.readLong());
+ rc.subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(is.readLong());
if (is.readBoolean()) {
rc.lastUpdate = Marshallers.LOCATION_MARSHALLER.readPayload(is);
} else {
@@ -69,12 +81,14 @@
public void writePayload(RootEntity object, DataOutput os) throws IOException {
os.writeInt(object.state);
+ os.writeInt(VERSION);
os.writeLong(object.maxMessageKey);
os.writeLong(object.messageKeyIndex.getPageId());
if (USE_LOC_INDEX)
os.writeLong(object.locationIndex.getPageId());
os.writeLong(object.destinationIndex.getPageId());
os.writeLong(object.messageRefsIndex.getPageId());
+ os.writeLong(object.subscriptionIndex.getPageId());
if (object.lastUpdate != null) {
os.writeBoolean(true);
Marshallers.LOCATION_MARSHALLER.writePayload(object.lastUpdate, os);
@@ -104,6 +118,9 @@
private BTreeIndex<AsciiBuffer, DestinationEntity> destinationIndex;
private final TreeMap<AsciiBuffer, DestinationEntity> destinations = new TreeMap<AsciiBuffer, DestinationEntity>();
+ // Subscriptions
+ private BTreeIndex<AsciiBuffer, Buffer> subscriptionIndex;
+
// /////////////////////////////////////////////////////////////////
// Lifecycle Methods.
// /////////////////////////////////////////////////////////////////
@@ -121,7 +138,7 @@
locationIndex = new BTreeIndex<Integer, Long>(tx.getPageFile(), tx.allocate().getPageId());
destinationIndex = new BTreeIndex<AsciiBuffer, DestinationEntity>(tx.getPageFile(), tx.allocate().getPageId());
messageRefsIndex = new BTreeIndex<Long, Long>(tx.getPageFile(), tx.allocate().getPageId());
-
+ subscriptionIndex = new BTreeIndex<AsciiBuffer, Buffer>(tx.getPageFile(), tx.allocate().getPageId());
page.set(this);
tx.store(page, MARSHALLER, true);
}
@@ -146,6 +163,11 @@
locationIndex.load(tx);
}
+ subscriptionIndex.setPageFile(tx.getPageFile());
+ subscriptionIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
+ subscriptionIndex.setValueMarshaller(Marshallers.BUFFER_MARSHALLER);
+ subscriptionIndex.load(tx);
+
destinationIndex.setPageFile(tx.getPageFile());
destinationIndex.setKeyMarshaller(Marshallers.ASCII_BUFFER_MARSHALLER);
destinationIndex.setValueMarshaller(DestinationEntity.MARSHALLER);
@@ -325,6 +347,97 @@
}
// /////////////////////////////////////////////////////////////////
+ // Client Methods.
+ // /////////////////////////////////////////////////////////////////
+
+ /**
+ * Returns a list of all of the stored subscriptions.
+ * @param tx The transaction under which this is to be executed.
+ * @return a list of all of the stored subscriptions.
+ * @throws IOException
+ */
+ public Iterator<SubscriptionRecord> listSubsriptions(Transaction tx) throws IOException {
+
+ final LinkedList<SubscriptionRecord> rc = new LinkedList<SubscriptionRecord>();
+
+ subscriptionIndex.visit(tx, new BTreeVisitor<AsciiBuffer, Buffer>() {
+ public boolean isInterestedInKeysBetween(AsciiBuffer first, AsciiBuffer second) {
+ return true;
+ }
+
+ public void visit(List<AsciiBuffer> keys, List<Buffer> values) {
+ for (Buffer b : values) {
+ try {
+ rc.add(toSubscriptionRecord(b));
+ } catch (InvalidProtocolBufferException e) {
+ throw new Store.FatalStoreException(e);
+ }
+ }
+ }
+ });
+
+ return rc.iterator();
+ }
+
+ /**
+ * @param tx
+ * @param name
+ * @throws IOException
+ */
+ public void removeSubscription(Transaction tx, AsciiBuffer name) throws IOException {
+ subscriptionIndex.remove(tx, name);
+ }
+
+ /**
+ * @param tx
+ * @param name
+ * @throws IOException
+ */
+ public void addSubscription(Transaction tx, SubscriptionAdd subscription) throws IOException {
+ subscriptionIndex.put(tx, subscription.getName(), subscription.freeze().toFramedBuffer());
+ }
+
+ /**
+ * @param name
+ * @return
+ * @throws IOException
+ */
+ public SubscriptionRecord getSubscription(Transaction tx, AsciiBuffer name) throws IOException {
+ return toSubscriptionRecord(subscriptionIndex.get(tx, name));
+ }
+
+ /**
+ * Converts a Subscription buffer to a SubscriptionRecord.
+ * @param b The buffer
+ * @return The record.
+ * @throws InvalidProtocolBufferException
+ */
+ private static SubscriptionRecord toSubscriptionRecord(Buffer b) throws InvalidProtocolBufferException {
+ if (b == null) {
+ return null;
+ }
+
+ SubscriptionRecord rc = null;
+ if (b != null) {
+ SubscriptionAddBuffer sab = SubscriptionAddBuffer.parseFramed(b);
+ if (sab != null) {
+ rc = new SubscriptionRecord();
+ rc.setName(sab.getName());
+ rc.setDestination(sab.getDestination());
+ rc.setIsDurable(sab.getDurable());
+ if (sab.hasAttachment())
+ rc.setAttachment(sab.getAttachment());
+ if (sab.hasSelector())
+ rc.setSelector(sab.getSelector());
+ if (sab.hasTte())
+ rc.setTte(sab.getTte());
+
+ }
+ }
+ return rc;
+ }
+
+ // /////////////////////////////////////////////////////////////////
// Queue Methods.
// /////////////////////////////////////////////////////////////////
public void queueAdd(Transaction tx, QueueDescriptor queue) throws IOException {
@@ -466,29 +579,28 @@
//are past the last update location in the journal. This can happen
//if the index is flushed before the journal.
int count = 0;
-
+
//TODO: It might be better to tie the the index update to the journal write
//so that we can be sure that all journal entries are on disk prior to
//index update.
//Scan MessageKey Index to find message keys past the last append
//location:
-// final ArrayList<Long> matches = new ArrayList<Long>();
-// messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
-//
-// @Override
-// protected void matched(Location key, Long value) {
-// matches.add(value);
-// }
-// });
-
-
-// for (Long sequenceId : matches) {
-// MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-// sd.locationIndex.remove(tx, keys.location);
-// sd.messageIdIndex.remove(tx, keys.messageId);
-// count++;
-// }
+ // final ArrayList<Long> matches = new ArrayList<Long>();
+ // messageKeyIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
+ //
+ // @Override
+ // protected void matched(Location key, Long value) {
+ // matches.add(value);
+ // }
+ // });
+
+ // for (Long sequenceId : matches) {
+ // MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ // sd.locationIndex.remove(tx, keys.location);
+ // sd.messageIdIndex.remove(tx, keys.messageId);
+ // count++;
+ // }
// @Override
// protected void matched(Location key, Long value) {
@@ -598,4 +710,5 @@
});
}
+
}
\ No newline at end of file
Modified: activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto (original)
+++ activemq/sandbox/activemq-flow/activemq-kaha/src/main/proto/kahadb-data.proto Fri Jun 19 21:23:03 2009
@@ -39,6 +39,8 @@
STREAM_WRITE = 41;
STREAM_CLOSE = 42;
STREAM_REMOVE = 43;
+ SUBSCRIPTION_ADD = 50;
+ SUBSCRIPTION_REMOVE = 51;
TRACE = 100;
}
@@ -86,6 +88,25 @@
optional int64 messageKey=2;
}
+
+///////////////////////////////////////////////////////////////
+// Client related operations.
+///////////////////////////////////////////////////////////////
+message SubscriptionAdd {
+ optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+ optional bytes selector = 2 [java_override_type = "AsciiBuffer"];
+ optional bytes destination = 3 [java_override_type = "AsciiBuffer"];
+ optional bool durable = 4 [default = false];
+ optional int64 tte = 5 [default = -1];
+ optional bytes attachment = 6;
+
+}
+
+message SubscriptionRemove {
+ optional bytes name = 1 [java_override_type = "AsciiBuffer"];
+}
+
+
///////////////////////////////////////////////////////////////
// Map related operations.
///////////////////////////////////////////////////////////////
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Fri Jun 19 21:23:03 2009
@@ -360,8 +360,15 @@
} else {
connection.onException(e);
}
- } catch (Throwable e) {
- connection.onException(new Exception(e));
+ }
+ catch (Throwable t) {
+ if (responseRequired) {
+ ExceptionResponse response = new ExceptionResponse(t);
+ response.setCorrelationId(commandId);
+ connection.write(response);
+ } else {
+ connection.onException(new RuntimeException(t));
+ }
}
}
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/Store.java Fri Jun 19 21:23:03 2009
@@ -100,10 +100,9 @@
/**
* Gets the store's root directory;
- */
+ */
public File getStoreDirectory();
-
/**
* Indicates that all messages should be deleted on startup
*
@@ -121,7 +120,7 @@
* @return A new store Session.
*/
public Session getSession();
-
+
/**
* This interface is used to execute transacted code.
*
@@ -172,6 +171,146 @@
}
}
+ public static class SubscriptionRecord {
+
+ AsciiBuffer name;
+ AsciiBuffer selector;
+ AsciiBuffer destination;
+ boolean isDurable;
+ long tte = -1;
+ Buffer attachment;
+
+ /**
+ * @return the name.
+ */
+ public AsciiBuffer getName() {
+ return name;
+ }
+
+ /**
+ * @param name
+ * the name to set
+ */
+ public void setName(AsciiBuffer name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the selector
+ */
+ public AsciiBuffer getSelector() {
+ return selector;
+ }
+
+ /**
+ * @param selector
+ * the selector to set.
+ */
+ public void setSelector(AsciiBuffer selector) {
+ this.selector = selector;
+ }
+
+ /**
+ * @return the destination
+ */
+ public AsciiBuffer getDestination() {
+ return destination;
+ }
+
+ /**
+ * @param destination
+ * the destination to set
+ */
+ public void setDestination(AsciiBuffer destination) {
+ this.destination = destination;
+ }
+
+ /**
+ * @return the isDurable
+ */
+ public boolean getIsDurable() {
+ return isDurable;
+ }
+
+ /**
+ * @param isDurable
+ * the isDurable to set
+ */
+ public void setIsDurable(boolean isDurable) {
+ this.isDurable = isDurable;
+ }
+
+ /**
+ * @return the tte
+ */
+ public long getTte() {
+ return tte;
+ }
+
+ /**
+ * @param tte
+ * the tte to set
+ */
+ public void setTte(long tte) {
+ this.tte = tte;
+ }
+
+ /**
+ * @return the attachment
+ */
+ public Buffer getAttachment() {
+ return attachment;
+ }
+
+ /**
+ * @param attachment
+ * the attachment to set
+ */
+ public void setAttachment(Buffer attachment) {
+ this.attachment = attachment;
+ }
+
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof SubscriptionRecord) {
+ return equals((SubscriptionRecord) o);
+ } else {
+ return false;
+ }
+ }
+
+ public boolean equals(SubscriptionRecord r) {
+ if (r == null) {
+ return false;
+ }
+ if (r.hashCode() != hashCode())
+ return false;
+
+ if (r.isDurable != isDurable)
+ return false;
+
+ if (r.tte != tte)
+ return false;
+
+ if (!name.equals(r.name))
+ return false;
+
+ if (!destination.equals(r.destination))
+ return false;
+
+ if (!(selector == null ? r.selector == null : selector.equals(r.selector)))
+ return false;
+
+ if (!(attachment == null ? r.attachment == null : attachment.equals(r.attachment)))
+ return false;
+
+ return true;
+ }
+ }
+
public static class QueueRecord {
Long queueKey;
Long messageKey;
@@ -179,7 +318,7 @@
int size;
boolean redelivered;
long tte;
-
+
public boolean isRedelivered() {
return redelivered;
}
@@ -318,12 +457,12 @@
* @return the first sequence number in the queue.
*/
public long getFirstSequence();
-
+
/**
* @return the last sequence number in the queue.
*/
public long getLastSequence();
-
+
/**
* @return The results for this queue's partitions
*/
@@ -359,12 +498,11 @@
*/
public void flush();
-
/**
- * @return true if the store is transactional.
+ * @return true if the store is transactional.
*/
public boolean isTransactional();
-
+
/**
* This interface allows you to query and update the Store.
*
@@ -373,59 +511,69 @@
*
*/
public interface Session {
-
+
+ ////////////////////////////////////////////////////////////////
+ //Lock related methods:
+ ////////////////////////////////////////////////////////////////
+
/**
* Commits work done on the Session
*/
public void commit();
/**
- * Rolls back work done on the Session
- * since the last call to {@link #acquireLock()}
+ * Rolls back work done on the Session since the last call to
+ * {@link #acquireLock()}
*
- * @throw {@link UnsupportedOperationException} if the store is not transactional
+ * @throw {@link UnsupportedOperationException} if the store is not
+ * transactional
*/
public void rollback();
/**
- * Indicates callers intent to start a transaction.
+ * Indicates callers intent to start a transaction.
*/
public void acquireLock();
/**
- * Indicates caller is done with the transaction, if
- * not committed then the transaction will be rolled back (providing
- * the store is transactional.
+ * Indicates caller is done with the transaction, if not committed then
+ * the transaction will be rolled back (providing the store is
+ * transactional.
*/
public void releaseLock();
- public void messageAdd(MessageRecord message);
-
- public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
-
- public Long streamOpen();
-
- public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
-
- public void streamClose(Long streamKey) throws KeyNotFoundException;
-
- public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+ ////////////////////////////////////////////////////////////////
+ //Client related methods
+ ////////////////////////////////////////////////////////////////
- public boolean streamRemove(Long streamKey);
-
- // Transaction related methods.
- public Iterator<Buffer> transactionList(Buffer first, int max);
-
- public void transactionAdd(Buffer txid);
-
- public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+ /**
+ * Adds a subscription to the store.
+ *
+ * @throws DuplicateKeyException
+ * if a subscription with the same name already exists
+ *
+ */
+ public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException;
- public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
+ /**
+ * Updates a subscription in the store. If the subscription does not
+ * exist then it will simply be added.
+ */
+ public void updateSubscription(SubscriptionRecord record);
- public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+ /**
+ * Removes a subscription with the given name from the store.
+ */
+ public void removeSubscription(AsciiBuffer name);
- public void transactionRollback(Buffer txid) throws KeyNotFoundException;
+ /**
+ * @return A list of the stored subscriptions.
+ */
+ public Iterator<SubscriptionRecord> listSubscriptions();
+ ////////////////////////////////////////////////////////////////
+ //Queue related methods
+ ////////////////////////////////////////////////////////////////
/**
* Gets a list of queues. The returned iterator returns top-level queues
* (e.g. queues without a parent). The child queues are accessible via
@@ -433,7 +581,7 @@
*
* @param firstQueueName
* If null starts the query at the first queue.
- * @param max
+ * @param max
* The maximum number of queues to return
* @return The list of queues.
*/
@@ -496,6 +644,43 @@
public Iterator<QueueRecord> queueListMessagesQueue(QueueDescriptor queue, Long firstQueueKey, Long maxSequence, int max) throws KeyNotFoundException;
+ ////////////////////////////////////////////////////////////////
+ //Message related methods
+ ////////////////////////////////////////////////////////////////
+ public void messageAdd(MessageRecord message);
+
+ public MessageRecord messageGetRecord(Long key) throws KeyNotFoundException;
+
+ public Long streamOpen();
+
+ public void streamWrite(Long streamKey, Buffer message) throws KeyNotFoundException;
+
+ public void streamClose(Long streamKey) throws KeyNotFoundException;
+
+ public Buffer streamRead(Long streamKey, int offset, int max) throws KeyNotFoundException;
+
+ public boolean streamRemove(Long streamKey);
+
+ ////////////////////////////////////////////////////////////////
+ //Transaction related methods
+ //TODO these will probably go away in favor of just using a queue.
+ ////////////////////////////////////////////////////////////////
+ public Iterator<Buffer> transactionList(Buffer first, int max);
+
+ public void transactionAdd(Buffer txid);
+
+ public void transactionAddMessage(Buffer txid, Long messageKey) throws KeyNotFoundException;
+
+ public void transactionRemoveMessage(Buffer txid, QueueDescriptor queueName, Long messageKey) throws KeyNotFoundException;
+
+ public void transactionCommit(Buffer txid) throws KeyNotFoundException;
+
+ public void transactionRollback(Buffer txid) throws KeyNotFoundException;
+
+ ////////////////////////////////////////////////////////////////
+ //Map related methods
+ ////////////////////////////////////////////////////////////////
+
public Iterator<AsciiBuffer> mapList(AsciiBuffer first, int max);
public boolean mapAdd(AsciiBuffer map);
Modified: activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/main/java/org/apache/activemq/broker/store/memory/MemoryStore.java Fri Jun 19 21:23:03 2009
@@ -28,7 +28,9 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.store.Store;
+import org.apache.activemq.broker.store.Store.DuplicateKeyException;
import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.queue.QueueDescriptor;
@@ -219,12 +221,11 @@
}
}
-
+
/**
* @return A new store Session.
*/
- public Session getSession()
- {
+ public Session getSession() {
return session;
}
@@ -317,41 +318,40 @@
private TreeMap<AsciiBuffer, StoredQueue> queues = new TreeMap<AsciiBuffer, StoredQueue>();
private TreeMap<Buffer, Transaction> transactions = new TreeMap<Buffer, Transaction>();
+ private HashMap<AsciiBuffer, SubscriptionRecord> subscriptions = new HashMap<AsciiBuffer, SubscriptionRecord>();
+
/**
- * Commits work done on the Session, if {@link Store#isTransactional()} is true.
+ * Commits work done on the Session, if {@link Store#isTransactional()}
+ * is true.
*/
- public void commit()
- {
+ public void commit() {
//NOOP
}
/**
- * Rolls back work done on the Session
- * since the last call to {@link #acquireLock()}
+ * Rolls back work done on the Session since the last call to
+ * {@link #acquireLock()}
*/
- public void rollback()
- {
+ public void rollback() {
throw new UnsupportedOperationException();
}
/**
- * Indicates callers intent to start a transaction. If the store
- * is transaction, the caller must call {@link #commit()} when the
- * done operating on the Session prior to a mandatory call to
+ * Indicates callers intent to start a transaction. If the store is
+ * transaction, the caller must call {@link #commit()} when the done
+ * operating on the Session prior to a mandatory call to
* {@link #releaseLock()}
*/
- public void acquireLock()
- {
+ public void acquireLock() {
updateLock.lock();
}
/**
- * Indicates caller is done with the transaction, if
- * not committed then the transaction will be rolled back (providing
- * the store is transactional.
+ * Indicates caller is done with the transaction, if not committed then
+ * the transaction will be rolled back (providing the store is
+ * transactional.
*/
- public void releaseLock()
- {
+ public void releaseLock() {
updateLock.unlock();
}
@@ -382,6 +382,50 @@
return null;
}
+ ////////////////////////////////////////////////////////////////
+ //Client related methods
+ ////////////////////////////////////////////////////////////////
+
+ /**
+ * Adds a subscription to the store.
+ *
+ * @throws DuplicateKeyException
+ * if a subscription with the same name already exists
+ *
+ */
+ public void addSubscription(SubscriptionRecord record) throws DuplicateKeyException {
+ SubscriptionRecord old = subscriptions.put(record.getName(), record);
+ if (old != null && !old.equals(record)) {
+ subscriptions.put(old.getName(), old);
+ throw new DuplicateKeyException(record.getName() + " already exists!");
+ }
+ }
+
+ /**
+ * Updates a subscription in the store. If the subscription does not
+ * exist then it will simply be added.
+ */
+ public void updateSubscription(SubscriptionRecord record) {
+ subscriptions.put(record.getName(), record);
+ }
+
+ /**
+ * Removes a subscription with the given name from the store.
+ */
+ public void removeSubscription(AsciiBuffer name) {
+ subscriptions.remove(name);
+ }
+
+ /**
+ * @return A list of subscriptions
+ */
+ public Iterator<SubscriptionRecord> listSubscriptions()
+ {
+ ArrayList<SubscriptionRecord> rc = new ArrayList<SubscriptionRecord>(subscriptions.size());
+ rc.addAll(subscriptions.values());
+ return rc.iterator();
+ }
+
// //////////////////////////////////////////////////////////////////////////////
// Queue related methods.
// ///////////////////////////////////////////////////////////////////////////////
@@ -644,13 +688,12 @@
// NOOP
}
- public File getStoreDirectory() {
- return null;
- }
+ public File getStoreDirectory() {
+ return null;
+ }
- public void setDeleteAllMessages(boolean val) {
+ public void setDeleteAllMessages(boolean val) {
// NOOP
}
-
}
Modified: activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java?rev=786668&r1=786667&r2=786668&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java (original)
+++ activemq/sandbox/activemq-flow/activemq-store/src/test/java/org/apache/activemq/broker/store/StoreTestBase.java Fri Jun 19 21:23:03 2009
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.store;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
import junit.framework.TestCase;
@@ -26,6 +27,7 @@
import org.apache.activemq.broker.store.Store.QueueQueryResult;
import org.apache.activemq.broker.store.Store.QueueRecord;
import org.apache.activemq.broker.store.Store.Session;
+import org.apache.activemq.broker.store.Store.SubscriptionRecord;
import org.apache.activemq.broker.store.Store.VoidCallback;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.activemq.protobuf.Buffer;
@@ -61,7 +63,7 @@
expected.setMessageId(new AsciiBuffer("1000"));
expected.setKey(store.allocateStoreTracking());
expected.setSize(expected.getBuffer().getLength());
-
+
store.execute(new VoidCallback<Exception>() {
public void run(Session session) throws Exception {
session.messageAdd(expected);
@@ -80,8 +82,8 @@
public void testQueueAdd() throws Exception {
final QueueDescriptor expected = new QueueDescriptor();
expected.setQueueName(new AsciiBuffer("testQueue"));
- expected.setApplicationType((short)1);
-
+ expected.setApplicationType((short) 1);
+
store.execute(new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
@@ -91,36 +93,35 @@
//Test that the queue was created:
checkQueue(expected, 0, 0);
-
- if(isStorePersistent())
- {
+
+ if (isStorePersistent()) {
//Restart the store and make sure the queue is still there
store.stop();
store = createStore(false);
store.start();
-
+
//Test that the queue was persisted
checkQueue(expected, 0, 0);
}
}
-
+
public void testQueueMessageAdd() throws Exception {
final QueueDescriptor queue = new QueueDescriptor();
queue.setQueueName(new AsciiBuffer("testQueue"));
- queue.setApplicationType((short)1);
-
+ queue.setApplicationType((short) 1);
+
final MessageRecord message = new MessageRecord();
message.setBuffer(new Buffer("buffer"));
message.setEncoding(new AsciiBuffer("encoding"));
message.setMessageId(new AsciiBuffer("1000"));
message.setKey(store.allocateStoreTracking());
message.setSize(message.getBuffer().getLength());
-
+
final QueueRecord qRecord = new QueueRecord();
qRecord.setMessageKey(message.getKey());
qRecord.setQueueKey(1L);
qRecord.setSize(message.getSize());
-
+
store.execute(new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
@@ -132,22 +133,97 @@
checkQueue(queue, message.getSize(), 1);
checkMessageRestore(queue, qRecord, message);
-
+
//Restart the store and make sure the queue is still there
- if(isStorePersistent())
- {
+ if (isStorePersistent()) {
store.stop();
store = createStore(false);
store.start();
-
+
//Test that the queue was persisted
checkQueue(queue, message.getSize(), 1);
checkMessageRestore(queue, qRecord, message);
}
}
- private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception
+ public void testSubscriptions() throws Exception {
+ HashMap<AsciiBuffer, SubscriptionRecord> expected = new HashMap<AsciiBuffer, SubscriptionRecord>();
+
+ final SubscriptionRecord record1 = new SubscriptionRecord();
+ record1.setName(new AsciiBuffer("sub1"));
+ record1.setIsDurable(true);
+ record1.setDestination(new AsciiBuffer("topic1"));
+ expected.put(record1.getName(), record1);
+
+ final SubscriptionRecord record2 = new SubscriptionRecord();
+ record2.setName(new AsciiBuffer("sub2"));
+ record2.setIsDurable(false);
+ record2.setDestination(new AsciiBuffer("topic2"));
+ record2.setTte(System.currentTimeMillis() + 40000);
+ record2.setSelector(new AsciiBuffer("foo"));
+ byte[] attachment2 = new byte[1024];
+ for (int i = 0; i < attachment2.length; i++) {
+ attachment2[i] = (byte) i;
+ }
+ record2.setAttachment(new Buffer(attachment2));
+ expected.put(record2.getName(), record2);
+
+ //They make it?
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.addSubscription(record1);
+ session.addSubscription(record2);
+ }
+ }, null);
+
+ checkSubscriptions(expected);
+
+ //Let's remove one:
+ expected.remove(record1.getName());
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ session.removeSubscription(record1.getName());
+ }
+ }, null);
+
+ checkSubscriptions(expected);
+
+ //Restart the store and make sure the queue is still there
+ if (isStorePersistent()) {
+ store.stop();
+ store = createStore(false);
+ store.start();
+
+ //Test that the queue was persisted
+ checkSubscriptions(expected);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void checkSubscriptions(HashMap<AsciiBuffer, SubscriptionRecord> expected) throws Exception
{
+ final HashMap<AsciiBuffer, SubscriptionRecord> checkMap = (HashMap<AsciiBuffer, SubscriptionRecord>) expected.clone();
+
+ store.execute(new VoidCallback<Exception>() {
+ @Override
+ public void run(Session session) throws Exception {
+ Iterator<SubscriptionRecord> results = session.listSubscriptions();
+ while(results.hasNext())
+ {
+ SubscriptionRecord r = results.next();
+ SubscriptionRecord e = checkMap.remove(r.getName());
+ assertEquals(r, e);
+ }
+
+ //Shouldn't be any expected results left:
+ assertEquals(0, checkMap.size());
+ }
+ }, null);
+ }
+
+ private void checkQueue(final QueueDescriptor queue, final long expectedSize, final long expectedCount) throws FatalStoreException, Exception {
store.execute(new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
@@ -160,9 +236,8 @@
}
}, null);
}
-
- private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message ) throws FatalStoreException, Exception
- {
+
+ private void checkMessageRestore(final QueueDescriptor queue, final QueueRecord qRecord, final MessageRecord message) throws FatalStoreException, Exception {
store.execute(new VoidCallback<Exception>() {
@Override
public void run(Session session) throws Exception {
@@ -176,7 +251,7 @@
}
}, null);
}
-
+
public void testStoreExecuteExceptionPassthrough() throws Exception {
try {
store.execute(new VoidCallback<Exception>() {
@@ -213,7 +288,7 @@
assertEquals(expected.getStreamKey(), actual.getStreamKey());
assertEquals(expected.getSize(), actual.getSize());
}
-
+
static void assertEquals(QueueDescriptor expected, QueueDescriptor actual) {
assertEquals(expected.getParent(), actual.getParent());
assertEquals(expected.getQueueType(), actual.getQueueType());
@@ -221,7 +296,7 @@
assertEquals(expected.getPartitionKey(), actual.getPartitionKey());
assertEquals(expected.getQueueName(), actual.getQueueName());
//TODO test partitions?
-
+
}
}