You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/10/27 17:27:30 UTC

svn commit: r468414 [7/7] - in /incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/ activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/ activemq/amqp/wireformat/ activemq/amqp/wireformat/v8_0/ activemq/qpid/broker...

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * Holds a set of subscriptions for a queue and manages the round
+ * robin-ing of deliver etc.
+ */
+class SubscriptionSet implements WeightedSubscriptionManager
+{
+    private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
+
+    /**
+     * List of registered subscribers
+     */
+    private List<Subscription> _subscriptions = new CopyOnWriteArrayList<Subscription>();
+
+    /**
+     * Used to control the round robin delivery of content
+     */
+    private int _currentSubscriber;
+
+    /**
+     * Accessor for unit tests.
+     */
+    int getCurrentSubscriber()
+    {
+        return _currentSubscriber;
+    }
+
+    public void addSubscriber(Subscription subscription)
+    {
+        _subscriptions.add(subscription);
+    }
+
+    /**
+     * Remove the subscription, returning it if it was found
+     * @param subscription
+     * @return null if no match was found
+     */
+    public Subscription removeSubscriber(Subscription subscription)
+    {
+        boolean isRemoved = _subscriptions.remove(subscription); // TODO: possibly need O(1) operation here.
+        if (isRemoved)
+        {
+            return subscription;
+        }
+        else
+        {
+            debugDumpSubscription(subscription);
+            return null;
+        }
+    }
+
+    private void debugDumpSubscription(Subscription subscription)
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Subscription " + subscription + " not found. Dumping subscriptions:");
+            for (Subscription s : _subscriptions)
+            {
+                _log.debug("Subscription: " + s);
+            }
+            _log.debug("Subscription dump complete");
+        }
+    }
+
+    /**
+     * Return the next unsuspended subscription or null if not found.
+     *
+     * Performance note:
+     * This method can scan all items twice when looking for a subscription that is not
+     * suspended. The worst case occcurs when all subscriptions are suspended. However, it is does this
+     * without synchronisation and subscriptions may be added and removed concurrently. Also note that because of
+     * race conditions and when subscriptions are removed between calls to nextSubscriber, the
+     * IndexOutOfBoundsException also causes the scan to start at the beginning.
+     */
+    public Subscription nextSubscriber(AMQMessage msg)
+    {
+        if (_subscriptions.isEmpty())
+        {
+            return null;
+        }
+
+        try {
+            final Subscription result = nextSubscriber();
+            if (result == null) {
+                _currentSubscriber = 0;
+                return nextSubscriber();
+            } else {
+                return result;
+            }
+        } catch (IndexOutOfBoundsException e) {
+            _currentSubscriber = 0;
+            return nextSubscriber();
+        }
+    }
+
+    private Subscription nextSubscriber()
+    {
+        final ListIterator<Subscription> iterator = _subscriptions.listIterator(_currentSubscriber);
+        while (iterator.hasNext()) {
+            Subscription subscription = iterator.next();
+            ++_currentSubscriber;
+            subscriberScanned();
+            if (!subscription.isSuspended()) {
+                return subscription;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Overridden in test classes.
+     */
+    protected void subscriberScanned()
+    {
+    }
+
+    public boolean isEmpty()
+    {
+        return _subscriptions.isEmpty();
+    }
+
+    public boolean hasActiveSubscribers()
+    {
+        for (Subscription s : _subscriptions)
+        {
+            if (!s.isSuspended()) return true;
+        }
+        return false;
+    }
+
+    public int getWeight()
+    {
+        int count = 0;
+        for (Subscription s : _subscriptions)
+        {
+            if (!s.isSuspended()) count++;
+        }
+        return count;
+    }
+
+    /**
+     * Notification that a queue has been deleted. This is called so that the subscription can inform the
+     * channel, which in turn can update its list of unacknowledged messages.
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        for (Subscription s : _subscriptions)
+        {
+            s.queueDeleted(queue);
+        }
+    }
+
+    int size() {
+        return _subscriptions.size();
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/queue/WeightedSubscriptionManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,23 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+public interface WeightedSubscriptionManager extends SubscriptionManager
+{
+    public int getWeight();        
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,30 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public interface AuthenticationManager
+{
+    String getMechanisms();
+
+    SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+
+    AuthenticationResult authenticate(SaslServer server, byte[] response);
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+public class AuthenticationResult
+{
+    public enum AuthenticationStatus
+    {
+        SUCCESS, CONTINUE, ERROR
+    }
+
+    public AuthenticationStatus status;
+    public byte[] challenge;
+
+    public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+    {
+        this.status = status;
+        this.challenge = challenge;
+    }
+
+    public AuthenticationResult(AuthenticationStatus status)
+    {
+        this.status = status;
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/security/auth/NullAuthenticationManager.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,82 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth;
+
+import org.apache.qpid.server.security.auth.AuthenticationManager;
+import org.apache.qpid.server.security.auth.AuthenticationResult;
+
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+public class NullAuthenticationManager implements AuthenticationManager
+{
+    public String getMechanisms()
+    {
+        return "PLAIN";
+    }
+
+    public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+    {
+        return new SaslServer()
+        {
+            public String getMechanismName()
+            {
+                return "PLAIN";
+            }
+
+            public byte[] evaluateResponse(byte[] response) throws SaslException
+            {
+                return new byte[0];
+            }
+
+            public boolean isComplete()
+            {
+                return true;
+            }
+
+            public String getAuthorizationID()
+            {
+                return "guest";
+            }
+
+            public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+            {
+                return new byte[0];
+            }
+
+            public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+            {
+                return new byte[0];
+            }
+
+            public Object getNegotiatedProperty(String propName)
+            {
+                return null;
+            }
+
+            public void dispose() throws SaslException
+            {
+            }
+        };
+    }
+
+    public AuthenticationResult authenticate(SaslServer server, byte[] response)
+    {
+        return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.SUCCESS);        
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * A simple message store that stores the messages in a threadsafe structure in memory.
+ */
+public class MemoryMessageStore implements MessageStore
+{
+    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+    private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
+
+    protected ConcurrentMap<Long, AMQMessage> _messageMap =  new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+
+    private final AtomicLong _messageId = new AtomicLong(1);
+
+    public void close() throws Exception
+    {
+        if (_messageMap != null)
+        {
+            _messageMap.clear();
+            _messageMap = null;
+        }
+    }
+
+    public void put(AMQMessage msg)
+    {
+        _messageMap.put(msg.getMessageId(), msg);
+    }
+
+    public void removeMessage(long messageId)
+    {
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Removing message with id " + messageId);
+        }
+        _messageMap.remove(messageId);
+    }
+
+    public void createQueue(AMQQueue queue) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeQueue(String name) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void enqueueMessage(String name, long messageId) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void dequeueMessage(String name, long messageId) throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void beginTran() throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void commitTran() throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void abortTran() throws AMQException
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public boolean inTran()
+    {
+        return false;
+    }
+
+    public List<AMQQueue> createQueues() throws AMQException
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getNewMessageId()
+    {
+        return _messageId.getAndIncrement();
+    }
+
+    public AMQMessage getMessage(long messageId)
+    {
+        return _messageMap.get(messageId);
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.List;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface MessageStore
+{
+
+    /**
+     * Called to close and cleanup any resources used by the message store.
+     * @throws Exception
+     */
+    void close() throws Exception;
+
+    void put(AMQMessage msg) throws AMQException;
+
+    void removeMessage(long messageId) throws AMQException;
+
+    void createQueue(AMQQueue queue) throws AMQException;
+
+    void removeQueue(String name) throws AMQException;
+
+    void enqueueMessage(String name, long messageId) throws AMQException;
+
+    void dequeueMessage(String name, long messageId) throws AMQException;
+
+    void beginTran() throws AMQException;
+
+    void commitTran() throws AMQException;
+
+    void abortTran() throws AMQException;
+
+    boolean inTran();
+
+    /**
+     * Recreate all queues that were persisted, including re-enqueuing of existing messages
+     * @return
+     * @throws AMQException
+     */
+    List<AMQQueue> createQueues() throws AMQException;
+
+    /**
+     * Return a valid, currently unused message id.
+     * @return a message id
+     */
+    long getNewMessageId();
+}
+
+

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,120 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds a list of TxnOp instance representing transactional
+ * operations. 
+ */
+public class TxnBuffer
+{
+    private boolean _containsPersistentChanges = false;
+    private final MessageStore _store;
+    private final List<TxnOp> _ops = new ArrayList<TxnOp>();
+    private static final Logger _log = Logger.getLogger(TxnBuffer.class);
+
+    public TxnBuffer(MessageStore store)
+    {
+        _store = store;
+    }
+
+    public void containsPersistentChanges()
+    {
+        _containsPersistentChanges = true;
+    }
+
+    public void commit() throws AMQException
+    {
+        if (_containsPersistentChanges)
+        {
+            _log.debug("Begin Transaction.");
+            _store.beginTran();
+            if(prepare())
+            {
+                _log.debug("Transaction Succeeded");
+                _store.commitTran();
+                for (TxnOp op : _ops)
+                {
+                    op.commit();
+                }
+            }
+            else
+            {
+                _log.debug("Transaction Failed");
+                _store.abortTran();
+            }
+        }else{
+            if(prepare())
+            {
+                for (TxnOp op : _ops)
+                {
+                    op.commit();
+                }
+            }            
+        }
+        _ops.clear();
+    }
+
+    private boolean prepare() 
+    {        
+        for (int i = 0; i < _ops.size(); i++)
+        {
+            TxnOp op = _ops.get(i);
+            try
+            {
+                op.prepare();
+            }
+            catch(Exception e)
+            {
+                //compensate previously prepared ops
+                for(int j = 0; j < i; j++)
+                {
+                    _ops.get(j).undoPrepare();
+                }    
+                return false;
+            }
+        }
+        return true;
+    }   
+
+    public void rollback() throws AMQException
+    {
+        for (TxnOp op : _ops)
+        {
+            op.rollback();
+        }
+        _ops.clear();
+    }
+
+    public void enlist(TxnOp op)
+    {
+        _ops.add(op);
+    }
+
+    public void cancel(TxnOp op)
+    {
+        _ops.remove(op);
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/txn/TxnOp.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+
+/**
+ * This provides the abstraction of an individual operation within a
+ * transaction. It is used by the TxnBuffer class.
+ */
+public interface TxnOp
+{
+    /**
+     * Do the part of the operation that updates persistent state 
+     */
+    public void prepare() throws AMQException;
+    /**
+     * Complete the operation started by prepare. Can now update in
+     * memory state or make netork transfers.
+     */
+    public void commit();
+    /**
+     * This is not the same as rollback. Unfortunately the use of an
+     * in memory reference count as a locking mechanism and a test for
+     * whether a message should be deleted means that as things are,
+     * handling an acknowledgement unavoidably alters both memory and
+     * persistent state on prepare. This is needed to 'compensate' or
+     * undo the in-memory change if the peristent update of later ops
+     * fails.
+     */
+    public void undoPrepare();
+    /**
+     * Rolls back the operation.
+     */
+    public void rollback();
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ConcurrentLinkedQueueNoSize<E> extends ConcurrentLinkedQueue<E>
+{
+    public int size()
+    {
+        if (isEmpty())
+        {
+            return 0;
+        }
+        else
+        {
+            return 1;
+        }
+    }        
+}