You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 17:27:30 UTC
svn commit: r468414 [7/7] - in
/incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/
activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/
activemq/amqp/wireformat/ activemq/amqp/wireformat/v8_0/
activemq/qpid/broker...
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Holds a set of subscriptions for a queue and manages the round
+ * robin-ing of deliver etc.
+ */
+class SubscriptionSet implements WeightedSubscriptionManager
+{
+ private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
+
+ /**
+ * List of registered subscribers
+ */
+ private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+
+ /**
+ * Used to control the round robin delivery of content
+ */
+ private int _currentSubscriber;
+
+ /**
+ * Accessor for unit tests.
+ */
+ int getCurrentSubscriber()
+ {
+ return _currentSubscriber;
+ }
+
+ public void addSubscriber(Subscription subscription)
+ {
+ _subscriptions.add(subscription);
+ }
+
+ /**
+ * Remove the subscription, returning it if it was found
+ * @param subscription
+ * @return null if no match was found
+ */
+ public Subscription removeSubscriber(Subscription subscription)
+ {
+ boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
+ if (isRemoved)
+ {
+ return subscription;
+ }
+ else
+ {
+ debugDumpSubscription(subscription);
+ return null;
+ }
+ }
+
+ private void debugDumpSubscription(Subscription subscription)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Subscription " + subscription + " not found. Dumping subscriptions:");
+ for (Subscription s : _subscriptions)
+ {
+ _log.debug("Subscription: " + s);
+ }
+ _log.debug("Subscription dump complete");
+ }
+ }
+
+ /**
+ * Return the next unsuspended subscription or null if not found.
+ *
+ * Performance note:
+ * This method can scan all items twice when looking for a subscription that is not
+ * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
+ * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
+ * race conditions and when subscriptions are removed between calls to nextSubscriber, the
+ * IndexOutOfBoundsException also causes the scan to start at the beginning.
+ */
+ public Subscription nextSubscriber(AMQMessage msg)
+ {
+ if (_subscriptions.isEmpty())
+ {
+ return null;
+ }
+
+ try {
+ final Subscription result = nextSubscriber();
+ if (result == null) {
+ _currentSubscriber = 0;
+ return nextSubscriber();
+ } else {
+ return result;
+ }
+ } catch (IndexOutOfBoundsException e) {
+ _currentSubscriber = 0;
+ return nextSubscriber();
+ }
+ }
+
+ private Subscription nextSubscriber()
+ {
+ final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+ while (iterator.hasNext()) {
+ Subscription subscription = iterator.next();
+ ++_currentSubscriber;
+ subscriberScanned();
+ if (!subscription.isSuspended()) {
+ return subscription;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Overridden in test classes.
+ */
+ protected void subscriberScanned()
+ {
+ }
+
+ public boolean isEmpty()
+ {
+ return _subscriptions.isEmpty();
+ }
+
+ public boolean hasActiveSubscribers()
+ {
+ for (Subscription s : _subscriptions)
+ {
+ if (!s.isSuspended()) return true;
+ }
+ return false;
+ }
+
+ public int getWeight()
+ {
+ int count = 0;
+ for (Subscription s : _subscriptions)
+ {
+ if (!s.isSuspended()) count++;
+ }
+ return count;
+ }
+
+ /**
+ * Notification that a queue has been deleted. This is called so that the subscription can inform the
+ * channel, which in turn can update its list of unacknowledged messages.
+ * @param queue
+ */
+ public void queueDeleted(AMQQueue queue)
+ {
+ for (Subscription s : _subscriptions)
+ {
+ s.queueDeleted(queue);
+ }
+ }
+
+ int size() {
+ return _subscriptions.size();
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface WeightedSubscriptionManager extends SubscriptionManager
+{
+ public int getWeight();
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public interface AuthenticationManager
+{
+ String getMechanisms();
+
+ SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+
+ AuthenticationResult authenticate(SaslServer server, byte[] response);
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+public class AuthenticationResult
+{
+ public enum AuthenticationStatus
+ {
+ SUCCESS, CONTINUE, ERROR
+ }
+
+ public AuthenticationStatus status;
+ public byte[] challenge;
+
+ public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+ {
+ this.status = status;
+ this.challenge = challenge;
+ }
+
+ public AuthenticationResult(AuthenticationStatus status)
+ {
+ this.status = status;
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public class NullAuthenticationManager implements AuthenticationManager
+{
+ public String getMechanisms()
+ {
+ return "PLAIN";
+ }
+
+ public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+ {
+ return new SaslServer()
+ {
+ public String getMechanismName()
+ {
+ return "PLAIN";
+ }
+
+ public byte[] evaluateResponse(byte[] response) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public String getAuthorizationID()
+ {
+ return "guest";
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ return new byte[0];
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ }
+ };
+ }
+
+ public AuthenticationResult authenticate(SaslServer server, byte[] response)
+ {
+ return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.SUCCESS);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
+ */
+public class MemoryMessageStore implements MessageStore
+{
+ private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+ private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
+
+ protected ConcurrentMap<Long, AMQMessage> _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ public void close() throws Exception
+ {
+ if (_messageMap != null)
+ {
+ _messageMap.clear();
+ _messageMap = null;
+ }
+ }
+
+ public void put(AMQMessage msg)
+ {
+ _messageMap.put(msg.getMessageId(), msg);
+ }
+
+ public void removeMessage(long messageId)
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Removing message with id " + messageId);
+ }
+ _messageMap.remove(messageId);
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeQueue(String name) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void enqueueMessage(String name, long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void dequeueMessage(String name, long messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void beginTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void commitTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void abortTran() throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public boolean inTran()
+ {
+ return false;
+ }
+
+ public List<AMQQueue> createQueues() throws AMQException
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getNewMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+
+ public AMQMessage getMessage(long messageId)
+ {
+ return _messageMap.get(messageId);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface MessageStore
+{
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ * @throws Exception
+ */
+ void close() throws Exception;
+
+ void put(AMQMessage msg) throws AMQException;
+
+ void removeMessage(long messageId) throws AMQException;
+
+ void createQueue(AMQQueue queue) throws AMQException;
+
+ void removeQueue(String name) throws AMQException;
+
+ void enqueueMessage(String name, long messageId) throws AMQException;
+
+ void dequeueMessage(String name, long messageId) throws AMQException;
+
+ void beginTran() throws AMQException;
+
+ void commitTran() throws AMQException;
+
+ void abortTran() throws AMQException;
+
+ boolean inTran();
+
+ /**
+ * Recreate all queues that were persisted, including re-enqueuing of existing messages
+ * @return
+ * @throws AMQException
+ */
+ List<AMQQueue> createQueues() throws AMQException;
+
+ /**
+ * Return a valid, currently unused message id.
+ * @return a message id
+ */
+ long getNewMessageId();
+}
+
+
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds a list of TxnOp instance representing transactional
+ * operations.
+ */
+public class TxnBuffer
+{
+ private boolean _containsPersistentChanges = false;
+ private final MessageStore _store;
+ private final List<TxnOp> _ops = new ArrayList<TxnOp>();
+ private static final Logger _log = Logger.getLogger(TxnBuffer.class);
+
+ public TxnBuffer(MessageStore store)
+ {
+ _store = store;
+ }
+
+ public void containsPersistentChanges()
+ {
+ _containsPersistentChanges = true;
+ }
+
+ public void commit() throws AMQException
+ {
+ if (_containsPersistentChanges)
+ {
+ _log.debug("Begin Transaction.");
+ _store.beginTran();
+ if(prepare())
+ {
+ _log.debug("Transaction Succeeded");
+ _store.commitTran();
+ for (TxnOp op : _ops)
+ {
+ op.commit();
+ }
+ }
+ else
+ {
+ _log.debug("Transaction Failed");
+ _store.abortTran();
+ }
+ }else{
+ if(prepare())
+ {
+ for (TxnOp op : _ops)
+ {
+ op.commit();
+ }
+ }
+ }
+ _ops.clear();
+ }
+
+ private boolean prepare()
+ {
+ for (int i = 0; i < _ops.size(); i++)
+ {
+ TxnOp op = _ops.get(i);
+ try
+ {
+ op.prepare();
+ }
+ catch(Exception e)
+ {
+ //compensate previously prepared ops
+ for(int j = 0; j < i; j++)
+ {
+ _ops.get(j).undoPrepare();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public void rollback() throws AMQException
+ {
+ for (TxnOp op : _ops)
+ {
+ op.rollback();
+ }
+ _ops.clear();
+ }
+
+ public void enlist(TxnOp op)
+ {
+ _ops.add(op);
+ }
+
+ public void cancel(TxnOp op)
+ {
+ _ops.remove(op);
+ }
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * This provides the abstraction of an individual operation within a
+ * transaction. It is used by the TxnBuffer class.
+ */
+public interface TxnOp
+{
+ /**
+ * Do the part of the operation that updates persistent state
+ */
+ public void prepare() throws AMQException;
+ /**
+ * Complete the operation started by prepare. Can now update in
+ * memory state or make netork transfers.
+ */
+ public void commit();
+ /**
+ * This is not the same as rollback. Unfortunately the use of an
+ * in memory reference count as a locking mechanism and a test for
+ * whether a message should be deleted means that as things are,
+ * handling an acknowledgement unavoidably alters both memory and
+ * persistent state on prepare. This is needed to 'compensate' or
+ * undo the in-memory change if the peristent update of later ops
+ * fails.
+ */
+ public void undoPrepare();
+ /**
+ * Rolls back the operation.
+ */
+ public void rollback();
+}
Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
+{
+ public int size()
+ {
+ if (isEmpty())
+ {
+ return 0;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+}