You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC

svn commit: r1157566 [21/23] - in /qpid: branches/rg-amqp-1-0-sandbox/qpid/java/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,303 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import org.apache.qpid.server.message.MessageMetaData_1_0;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class ReceivingLink_1_0 implements ReceivingLinkListener, Link_1_0, DeliveryStateHandler
+{
+    private VirtualHost _vhost;
+
+    private ReceivingDestination _destination;
+    private SectionDecoderImpl _sectionDecoder;
+    private volatile ReceivingLinkAttachment _attachment;
+
+
+    private ArrayList<Transfer> _incompleteMessage;
+    private TerminusDurability _durability;
+
+    private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
+    private boolean _resumedMessage;
+    private Binary _messageDeliveryTag;
+    private ReceiverSettleMode _receivingSettlementMode;
+
+
+    public ReceivingLink_1_0(ReceivingLinkAttachment receivingLinkAttachment, VirtualHost vhost,
+                             ReceivingDestination destination)
+    {
+        _vhost = vhost;
+        _destination = destination;
+        _attachment = receivingLinkAttachment;
+        receivingLinkAttachment.setDeliveryStateHandler(this);
+
+        _durability = ((Target)receivingLinkAttachment.getTarget()).getDurable();
+
+        _sectionDecoder = new SectionDecoderImpl(receivingLinkAttachment.getEndpoint().getSession().getConnection().getDescribedTypeRegistry());
+
+
+    }
+
+    public void messageTransfer(Transfer xfr)
+    {
+        // TODO - cope with fragmented messages
+
+        List<ByteBuffer> fragments = null;
+
+
+
+        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+        {
+            _incompleteMessage = new ArrayList<Transfer>();
+            _incompleteMessage.add(xfr);
+            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+            _messageDeliveryTag = xfr.getDeliveryTag();
+            return;
+        }
+        else if(_incompleteMessage != null)
+        {
+            _incompleteMessage.add(xfr);
+
+            if(Boolean.TRUE.equals(xfr.getMore()))
+            {
+                return;
+            }
+
+            fragments = new ArrayList<ByteBuffer>(_incompleteMessage.size());
+            for(Transfer t : _incompleteMessage)
+            {
+                fragments.add(t.getPayload());
+            }
+            _incompleteMessage=null;
+
+        }
+        else
+        {
+            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
+            _messageDeliveryTag = xfr.getDeliveryTag();
+            fragments = Collections.singletonList(xfr.getPayload());
+        }
+
+        if(_resumedMessage)
+        {
+            if(_unsettledMap.containsKey(_messageDeliveryTag))
+            {
+                Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
+                boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+                getEndpoint().updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
+                if(settled)
+                {
+                    _unsettledMap.remove(_messageDeliveryTag);
+                }
+            }
+            else
+            {
+                System.err.println("UNEXPECTED!!");
+                System.err.println("Delivery Tag: " + _messageDeliveryTag);
+                System.err.println("_unsettledMap: " + _unsettledMap);
+
+            }
+        }
+        else
+        {
+            MessageMetaData_1_0 mmd = null;
+            List<ByteBuffer> immutableSections = new ArrayList<ByteBuffer>(3);
+            mmd = new MessageMetaData_1_0(fragments.toArray(new ByteBuffer[fragments.size()]),
+                    _sectionDecoder,
+                    immutableSections);
+
+            StoredMessage<MessageMetaData_1_0> storedMessage = _vhost.getMessageStore().addMessage(mmd);
+
+            boolean skipping = true;
+            int offset = 0;
+
+            for(ByteBuffer bareMessageBuf : immutableSections)
+            {
+                storedMessage.addContent(offset, bareMessageBuf.duplicate());
+                offset += bareMessageBuf.remaining();
+            }
+
+            storedMessage.flushToStore();
+
+            Message_1_0 message = new Message_1_0(storedMessage, fragments);
+
+
+            Binary transactionId = null;
+            org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState();
+            if(xfrState != null)
+            {
+                if(xfrState instanceof TransactionalState)
+                {
+                    transactionId = ((TransactionalState)xfrState).getTxnId();
+                }
+            }
+
+            ServerTransaction transaction = null;
+            if(transactionId != null)
+            {
+                transaction = getSession().getTransaction(transactionId);
+            }
+            else
+            {
+                Session_1_0 session = getSession();
+                transaction = session != null ? session.getTransaction(null) : new AutoCommitTransaction(_vhost.getTransactionLog());
+            }
+
+            Outcome outcome = _destination.send(message, transaction);
+
+            DeliveryState resultantState;
+
+            if(transactionId == null)
+            {
+                resultantState = (DeliveryState) outcome;
+            }
+            else
+            {
+                TransactionalState transactionalState = new TransactionalState();
+                transactionalState.setOutcome(outcome);
+                transactionalState.setTxnId(transactionId);
+                resultantState = transactionalState;
+
+            }
+
+
+            boolean settled = transaction instanceof AutoCommitTransaction && ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+
+            final Binary deliveryTag = xfr.getDeliveryTag();
+
+            if(!settled)
+            {
+                _unsettledMap.put(deliveryTag, outcome);
+            }
+
+            getEndpoint().updateDisposition(deliveryTag, resultantState, settled);
+
+            if(!(transaction instanceof AutoCommitTransaction))
+            {
+                ServerTransaction.Action a;
+                transaction.addPostTransactionAction(new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        getEndpoint().updateDisposition(deliveryTag, null, true);
+                    }
+
+                    public void onRollback()
+                    {
+                        getEndpoint().updateDisposition(deliveryTag, null, true);
+                    }
+                });
+            }
+        }
+    }
+
+    private ReceiverSettleMode getReceivingSettlementMode()
+    {
+        return _receivingSettlementMode;
+    }
+
+    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+    {
+        //TODO
+        // if not durable or close
+        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        {
+            endpoint.detach();
+        }
+        else if(detach == null || detach.getError() != null)
+        {
+            _attachment = null;
+        }
+    }
+
+    public void start()
+    {
+        getEndpoint().setLinkCredit(UnsignedInteger.valueOf(_destination.getCredit()));
+        getEndpoint().setCreditWindow();
+    }
+
+    public ReceivingLinkEndpoint getEndpoint()
+    {
+        return _attachment.getEndpoint();
+    }
+
+
+    public Session_1_0 getSession()
+    {
+        ReceivingLinkAttachment attachment = _attachment;
+        return attachment == null ? null : attachment.getSession();
+    }
+
+    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    {
+        if(Boolean.TRUE.equals(settled))
+        {
+            _unsettledMap.remove(deliveryTag);
+        }
+    }
+
+    public void setLinkAttachment(ReceivingLinkAttachment linkAttachment)
+    {
+        _attachment = linkAttachment;
+        _receivingSettlementMode = linkAttachment.getEndpoint().getReceivingSettlementMode();
+        ReceivingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+        Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+
+        Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
+        for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+        {
+            Binary deliveryTag = entry.getKey();
+            if(!initialUnsettledMap.containsKey(deliveryTag))
+            {
+                _unsettledMap.remove(deliveryTag);
+            }
+        }
+
+    }
+
+    public Map getUnsettledOutcomeMap()
+    {
+        return _unsettledMap;
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingDestination.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+
+public interface SendingDestination extends Destination
+{
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,44 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Source;
+
+public class SendingLinkAttachment
+{
+    private final Session_1_0         _session;
+    private final SendingLinkEndpoint _endpoint;
+
+    public SendingLinkAttachment(final Session_1_0 session, final SendingLinkEndpoint endpoint)
+    {
+        _session = session;
+        _endpoint = endpoint;
+    }
+
+    public Session_1_0 getSession()
+    {
+        return _session;
+    }
+
+    public SendingLinkEndpoint getEndpoint()
+    {
+        return _endpoint;
+    }
+
+    public Source getSource()
+    {
+        return getEndpoint().getSource();
+    }
+
+    public void setDeliveryStateHandler(final DeliveryStateHandler handler)
+    {
+        getEndpoint().setDeliveryStateHandler(handler);
+    }
+
+    public void updateDisposition(final Binary deliveryTag, final DeliveryState state, final boolean settled)
+    {
+        getEndpoint().updateDisposition(deliveryTag, state, settled);
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,367 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
+{
+    private VirtualHost _vhost;
+    private SendingDestination _destination;
+
+    private Subscription_1_0 _subscription;
+    private boolean _draining;
+    private final Map<Binary, QueueEntry> _unsettledMap =
+            new HashMap<Binary, QueueEntry>();
+
+    private final ConcurrentHashMap<Binary, UnsettledAction> _unsettledActionMap =
+            new ConcurrentHashMap<Binary, UnsettledAction>();
+    private volatile SendingLinkAttachment _linkAttachment;
+    private TerminusDurability _durability;
+    private List<QueueEntry> _resumeFullTransfers = new ArrayList<QueueEntry>();
+    private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
+    private Runnable _closeAction;
+
+    public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
+                           final VirtualHost vhost,
+                           final SendingDestination destination)
+    {
+        _vhost = vhost;
+        _destination = destination;
+        _linkAttachment = linkAttachment;
+        _durability = ((Source)linkAttachment.getSource()).getDurable();
+        linkAttachment.setDeliveryStateHandler(this);
+
+        if(destination instanceof QueueDestination)
+        {
+            AMQQueue queue = ((QueueDestination) _destination).getQueue();
+            if(queue.getArguments().containsKey("topic"))
+            {
+                ((Source)linkAttachment.getSource()).setDistributionMode(StdDistMode.COPY);
+            }
+            _subscription = new Subscription_1_0(this, (QueueDestination)destination);
+            try
+            {
+
+                queue.registerSubscription(_subscription, false);
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //TODO
+            }
+        }
+
+    }
+
+    public void resume(SendingLinkAttachment linkAttachment)
+    {
+        _linkAttachment = linkAttachment;
+
+    }
+
+    public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+    {
+        //TODO
+        // if not durable or close
+        if(!TerminusDurability.UNSETTLED_STATE.equals(_durability) ||
+           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        {
+
+            try
+            {
+                ((QueueDestination)_destination).getQueue().unregisterSubscription(_subscription);
+            }
+            catch (AMQException e)
+            {
+                e.printStackTrace();  //TODO
+            }
+
+            DeliveryState state = new Released();
+
+            for(UnsettledAction action : _unsettledActionMap.values())
+            {
+
+                action.process(state,Boolean.TRUE);
+            }
+            _unsettledActionMap.clear();
+
+            endpoint.detach();
+            if(_closeAction != null)
+            {
+                _closeAction.run();
+            }
+        }
+        else if(detach == null || detach.getError() != null)
+        {
+            _linkAttachment = null;
+            _subscription.flowStateChanged();
+        }
+    }
+
+    public void start()
+    {
+        //TODO
+    }
+
+    public SendingLinkEndpoint getEndpoint()
+    {
+        return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
+    }
+
+    public void flowStateChanged()
+    {
+        if(Boolean.TRUE.equals(getEndpoint().getDrain())
+                && getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
+        {
+            _draining = true;
+        }
+
+        while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
+        {
+            Accepted accepted = new Accepted();
+            synchronized(getLock())
+            {
+
+                Transfer xfr = new Transfer();
+                Binary dt = _resumeAcceptedTransfers.remove(0);
+                xfr.setDeliveryTag(dt);
+                xfr.setState(accepted);
+                xfr.setResume(Boolean.TRUE);
+                getEndpoint().transfer(xfr);
+            }
+
+        }
+        if(_resumeAcceptedTransfers.isEmpty())
+        {
+            _subscription.flowStateChanged();
+        }
+
+    }
+
+    public boolean isDraining()
+    {
+        return false;  //TODO
+    }
+
+    public boolean drained()
+    {
+        if(getEndpoint() != null)
+        {
+            synchronized(getEndpoint().getLock())
+            {
+                if(_draining)
+                {
+                    //TODO
+                    getEndpoint().drained();
+                    _draining = false;
+                    return true;
+                }
+                else
+                {
+                    return false;
+                }
+            }
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    public void addUnsettled(Binary tag, UnsettledAction unsettledAction, QueueEntry queueEntry)
+    {
+        _unsettledActionMap.put(tag,unsettledAction);
+        if(getTransactionId() == null)
+        {
+            _unsettledMap.put(tag, queueEntry);
+        }
+    }
+
+    public void removeUnsettled(Binary tag)
+    {
+        _unsettledActionMap.remove(tag);
+    }
+
+    public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    {
+        UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+        boolean localSettle = false;
+        if(action != null)
+        {
+            localSettle = action.process(state, settled);
+            if(localSettle && !Boolean.TRUE.equals(settled))
+            {
+                _linkAttachment.updateDisposition(deliveryTag, state, true);
+            }
+        }
+        if(Boolean.TRUE.equals(settled) || localSettle)
+        {
+            _unsettledActionMap.remove(deliveryTag);
+            _unsettledMap.remove(deliveryTag);
+        }
+    }
+
+    ServerTransaction getTransaction(Binary transactionId)
+    {
+        return _linkAttachment.getSession().getTransaction(transactionId);
+    }
+
+    public Binary getTransactionId()
+    {
+        return getEndpoint().getTransactionId();
+    }
+
+    public synchronized Object getLock()
+    {
+        return _linkAttachment == null ? this : getEndpoint().getLock();
+    }
+
+    public boolean isDetached()
+    {
+        return _linkAttachment == null || getEndpoint().isDetached();
+    }
+
+    public boolean isAttached()
+    {
+        return _linkAttachment != null && getEndpoint().isAttached();
+    }
+
+    public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment)
+    {
+
+        if(_subscription.isActive())
+        {
+            _subscription.suspend();
+        }
+
+        _linkAttachment = linkAttachment;
+
+        SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
+        endpoint.setDeliveryStateHandler(this);
+        Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
+        Map<Binary, QueueEntry> unsettledCopy = new HashMap<Binary, QueueEntry>(_unsettledMap);
+        _resumeAcceptedTransfers.clear();
+        _resumeFullTransfers.clear();
+        for(Map.Entry<Binary, QueueEntry> entry : unsettledCopy.entrySet())
+        {
+            Binary deliveryTag = entry.getKey();
+            final QueueEntry queueEntry = entry.getValue();
+            if(initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+            {
+                queueEntry.setRedelivered();
+                queueEntry.release();
+                _unsettledMap.remove(deliveryTag);
+            }
+            else if(initialUnsettledMap != null && (initialUnsettledMap.get(deliveryTag) instanceof Outcome))
+            {
+                Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+
+                if(outcome instanceof Accepted)
+                {
+                    AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog());
+                    if(_subscription.acquires())
+                    {
+                        txn.dequeue(Collections.singleton(queueEntry),
+                                new ServerTransaction.Action()
+                                {
+                                    public void postCommit()
+                                    {
+                                        queueEntry.discard();
+                                    }
+
+                                    public void onRollback()
+                                    {
+                                        //To change body of implemented methods use File | Settings | File Templates.
+                                    }
+                                });
+                    }
+                }
+                else if(outcome instanceof Released)
+                {
+                    AutoCommitTransaction txn = new AutoCommitTransaction(_vhost.getTransactionLog());
+                    if(_subscription.acquires())
+                    {
+                        txn.dequeue(Collections.singleton(queueEntry),
+                                new ServerTransaction.Action()
+                                {
+                                    public void postCommit()
+                                    {
+                                        queueEntry.release();
+                                    }
+
+                                    public void onRollback()
+                                    {
+                                        //To change body of implemented methods use File | Settings | File Templates.
+                                    }
+                                });
+                    }
+                }
+                //_unsettledMap.remove(deliveryTag);
+                initialUnsettledMap.remove(deliveryTag);
+                _resumeAcceptedTransfers.add(deliveryTag);
+            }
+            else
+            {
+                _resumeFullTransfers.add(queueEntry);
+                // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+            }
+            // TODO - else
+        }
+
+    }
+
+    public Map getUnsettledOutcomeMap()
+    {
+        Map<Binary, QueueEntry> unsettled = new HashMap<Binary, QueueEntry>(_unsettledMap);
+
+        for(Map.Entry<Binary, QueueEntry> entry : unsettled.entrySet())
+        {
+            entry.setValue(null);
+        }
+
+        return unsettled;
+    }
+
+    public void setCloseAction(Runnable action)
+    {
+        _closeAction = action;
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,406 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Coordinator;
+import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.*;
+
+public class Session_1_0 implements SessionEventListener
+{
+    private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
+    private IApplicationRegistry _appRegistry;
+    private VirtualHost _vhost;
+    private AutoCommitTransaction _transaction;
+
+    private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
+            new LinkedHashMap<Integer, ServerTransaction>();
+    private final Connection_1_0 _connection;
+
+
+    public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
+    {
+        _appRegistry = appRegistry;
+        _vhost = vhost;
+        _transaction = new AutoCommitTransaction(vhost.getMessageStore());
+        _connection = connection;
+
+    }
+
+    public void remoteLinkCreation(final LinkEndpoint endpoint)
+    {
+
+
+        Destination destination;
+        Link_1_0 link = null;
+        Error error = null;
+
+        final LinkRegistry linkRegistry = _vhost.getLinkRegistry(endpoint.getSession().getConnection().getRemoteContainerId());
+
+
+        if(endpoint.getRole() == Role.SENDER)
+        {
+
+            SendingLink_1_0 previousLink = linkRegistry.getDurableSendingLink(endpoint.getName());
+
+            if(previousLink == null)
+            {
+
+                Target target = (Target) endpoint.getTarget();
+                Source source = (Source) endpoint.getSource();
+
+
+                if(source != null)
+                {
+                    if(Boolean.TRUE.equals(source.getDynamic()))
+                    {
+                        AMQQueue tempQueue = createTemporaryQueue(source.getDynamicNodeProperties());
+                        source.setAddress(tempQueue.getName());
+                    }
+                    String addr = source.getAddress();
+                    AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+                    if(queue != null)
+                    {
+
+                        destination = new QueueDestination(queue);
+                    }
+                    else
+                    {
+                        endpoint.setSource(null);
+                        destination = null;
+                    }
+
+                }
+                else
+                {
+                    destination = null;
+                }
+
+                if(destination != null)
+                {
+                    final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+                    final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+                                                                            _vhost,
+                                                                            (SendingDestination) destination
+                    );
+                    sendingLinkEndpoint.setLinkEventListener(sendingLink);
+                    link = sendingLink;
+                    if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+                    {
+                        linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+                        sendingLink.setCloseAction(new Runnable() {
+
+                            public void run()
+                            {
+                                linkRegistry.unregisterSendingLink(endpoint.getName());
+                            }
+                        });
+                    }
+                }
+            }
+            else
+            {
+                SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
+                previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+                sendingLinkEndpoint.setLinkEventListener(previousLink);
+                link = previousLink;
+                endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+            }
+        }
+        else
+        {
+            if(endpoint.getTarget() instanceof Coordinator)
+            {
+                Coordinator coordinator = (Coordinator) endpoint.getTarget();
+                TxnCapability[] capabilities = coordinator.getCapabilities();
+                boolean localTxn = false;
+                boolean multiplePerSession = false;
+                if(capabilities != null)
+                {
+                    for(TxnCapability capability : capabilities)
+                    {
+                        if(capability.equals(TxnCapability.LOCAL_TXN))
+                        {
+                            localTxn = true;
+                        }
+                        else if(capability.equals(TxnCapability.MULTI_TXNS_PER_SSN))
+                        {
+                            multiplePerSession = true;
+                        }
+                        else
+                        {
+                            error = new Error();
+                            error.setCondition(AmqpError.NOT_IMPLEMENTED);
+                            error.setDescription("Unsupported capability: " + capability);
+                            break;
+                        }
+                    }
+                }
+
+       /*         if(!localTxn)
+                {
+                    capabilities.add(TxnCapabilities.LOCAL_TXN);
+                }*/
+
+                final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                final TxnCoordinatorLink_1_0 coordinatorLink =
+                        new TxnCoordinatorLink_1_0(_vhost, this, receivingLinkEndpoint, _openTransactions);
+                receivingLinkEndpoint.setLinkEventListener(coordinatorLink);
+                link = coordinatorLink;
+
+
+            }
+            else
+            {
+
+                ReceivingLink_1_0 previousLink = linkRegistry.getDurableReceivingLink(endpoint.getName());
+
+                if(previousLink == null)
+                {
+
+                    Target target = (Target) endpoint.getTarget();
+
+                    if(target != null)
+                    {
+                        if(Boolean.TRUE.equals(target.getDynamic()))
+                        {
+
+                            AMQQueue tempQueue = createTemporaryQueue(target.getDynamicNodeProperties());
+                            target.setAddress(tempQueue.getName());
+                        }
+
+                        String addr = target.getAddress();
+                        Exchange exchg = _vhost.getExchangeRegistry().getExchange(addr);
+                        if(exchg != null)
+                        {
+                            destination = new ExchangeDestination(exchg);
+                        }
+                        else
+                        {
+                            AMQQueue queue = _vhost.getQueueRegistry().getQueue(addr);
+                            if(queue != null)
+                            {
+
+                                destination = new QueueDestination(queue);
+                            }
+                            else
+                            {
+                                endpoint.setTarget(null);
+                                destination = null;
+                            }
+
+                        }
+
+
+                    }
+                    else
+                    {
+                        destination = null;
+                    }
+                    if(destination != null)
+                    {
+                        final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                        final ReceivingLink_1_0 receivingLink = new ReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint), _vhost,
+                                (ReceivingDestination) destination);
+                        receivingLinkEndpoint.setLinkEventListener(receivingLink);
+                        link = receivingLink;
+                        if(TerminusDurability.UNSETTLED_STATE.equals(target.getDurable()))
+                        {
+                            linkRegistry.registerReceivingLink(endpoint.getName(), receivingLink);
+                        }
+                    }
+                }
+                else
+                {
+                    ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
+                    previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+                    receivingLinkEndpoint.setLinkEventListener(previousLink);
+                    link = previousLink;
+                    endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+
+                }
+            }
+        }
+
+        endpoint.attach();
+
+        if(link == null)
+        {
+            endpoint.detach(error);
+        }
+        else
+        {
+            link.start();
+        }
+    }
+
+
+    private AMQQueue createTemporaryQueue(Map properties)
+    {
+        final String queueName = UUID.randomUUID().toString();
+        AMQQueue queue = null;
+        try
+        {
+            LifetimePolicy lifetimePolicy = properties == null
+                                            ? null
+                                            : (LifetimePolicy) properties.get(LIFETIME_POLICY);
+
+            final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName,
+                                                                                   false, // durable
+                                                                                   null, // owner
+                                                                                   false, // autodelete
+                                                                                   false, // exclusive
+                                                                                   _vhost,
+                                                                                   properties);
+
+
+
+            if (lifetimePolicy == null || lifetimePolicy instanceof DeleteOnClose)
+            {
+                final Connection_1_0.Task deleteQueueTask =
+                        new Connection_1_0.Task()
+                        {
+                            public void doTask(Connection_1_0 session)
+                            {
+                                if (_vhost.getQueueRegistry().getQueue(queueName) == tempQueue)
+                                {
+                                    try
+                                    {
+                                        tempQueue.delete();
+                                    }
+                                    catch (AMQException e)
+                                    {
+                                        e.printStackTrace();  //TODO.
+                                    }
+                                }
+                            }
+                        };
+
+                _connection.addConnectionCloseTask(deleteQueueTask);
+
+                queue.addQueueDeleteTask(new AMQQueue.Task()
+                {
+                    public void doTask(AMQQueue queue)
+                    {
+                        _connection.removeConnectionCloseTask(deleteQueueTask);
+                    }
+
+
+                });
+            }
+            else if(lifetimePolicy instanceof DeleteOnNoLinks)
+            {
+
+            }
+            else if(lifetimePolicy instanceof DeleteOnNoMessages)
+            {
+
+            }
+            else if(lifetimePolicy instanceof DeleteOnNoLinksOrMessages)
+            {
+
+            }
+        }
+        catch (AMQSecurityException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+        return queue;
+    }
+
+    public ServerTransaction getTransaction(Binary transactionId)
+    {
+        // TODO should treat invalid id differently to null
+        ServerTransaction transaction = _openTransactions.get(binaryToInteger(transactionId));
+        return transaction == null ? _transaction : transaction;
+    }
+
+    public void remoteEnd(End end)
+    {
+        Iterator<Map.Entry<Integer, ServerTransaction>> iter = _openTransactions.entrySet().iterator();
+        while(iter.hasNext())
+        {
+            Map.Entry<Integer, ServerTransaction> entry = iter.next();
+            entry.getValue().rollback();
+            iter.remove();
+        }
+
+    }
+
+    Integer binaryToInteger(final Binary txnId)
+    {
+        if(txnId == null)
+        {
+            return null;
+        }
+
+        if(txnId.getLength() > 4)
+            throw new IllegalArgumentException();
+
+        int id = 0;
+        byte[] data = txnId.getArray();
+        for(int i = 0; i < txnId.getLength(); i++)
+        {
+            id <<= 8;
+            id += data[i+txnId.getArrayOffset()];
+        }
+
+        return id;
+
+    }
+
+    Binary integerToBinary(final int txnId)
+    {
+        byte[] data = new byte[4];
+        data[3] = (byte) (txnId & 0xff);
+        data[2] = (byte) ((txnId & 0xff00) >> 8);
+        data[1] = (byte) ((txnId & 0xff0000) >> 16);
+        data[0] = (byte) ((txnId & 0xff000000) >> 24);
+        return new Binary(data);
+
+    }
+
+    public void forceEnd()
+    {
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,487 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.Outcome;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.txn.ServerTransaction;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+class Subscription_1_0 implements Subscription
+{
+    private SendingLink_1_0 _link;
+
+    private AMQQueue _queue;
+
+    private final AtomicReference<State> _state = new AtomicReference<State>(State.SUSPENDED);
+
+    private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
+    private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
+    private final long _id;
+    private final boolean _acquires;
+    private AMQQueue.Context _queueContext;
+    private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
+    private ReentrantLock _stateChangeLock = new ReentrantLock();
+
+    private long _deliveryTag = 0L;
+    private StateListener _stateListener;
+
+    private Binary _transactionId;
+
+    public Subscription_1_0(final SendingLink_1_0 link, final QueueDestination destination)
+    {
+        _link = link;
+        _queue = destination.getQueue();
+        _id = getEndpoint().getLocalHandle().longValue();
+        _acquires = !StdDistMode.COPY.equals(((Source) (getEndpoint().getSource())).getDistributionMode());
+    }
+
+    private SendingLinkEndpoint getEndpoint()
+    {
+        return _link.getEndpoint();
+    }
+
+    public LogActor getLogActor()
+    {
+        return null;  //TODO
+    }
+
+    public boolean isTransient()
+    {
+        return true;  //TODO
+    }
+
+    public AMQQueue getQueue()
+    {
+        return _queue;
+    }
+
+    public QueueEntry.SubscriptionAcquiredState getOwningState()
+    {
+        return _owningState;
+    }
+
+    public QueueEntry.SubscriptionAssignedState getAssignedState()
+    {
+        return _assignedState;
+    }
+
+    public void setQueue(final AMQQueue queue, final boolean exclusive)
+    {
+        //TODO
+    }
+
+    public void setNoLocal(final boolean noLocal)
+    {
+        //TODO
+    }
+
+    public long getSubscriptionID()
+    {
+        return _id;
+    }
+
+    public boolean isSuspended()
+    {
+        final boolean isSuspended = !isActive();// || !getEndpoint().hasCreditToSend();
+        return isSuspended;
+    }
+
+    public boolean hasInterest(final QueueEntry msg)
+    {
+        return true;  //TODO - filters
+    }
+
+    public boolean isClosed()
+    {
+        return !getEndpoint().isAttached();
+    }
+
+    public boolean acquires()
+    {
+        return _acquires;
+    }
+
+    public boolean seesRequeues()
+    {
+        // TODO
+        return acquires();
+    }
+
+    public void close()
+    {
+        getEndpoint().detach();
+    }
+
+    public void send(final QueueEntry queueEntry) throws AMQException
+    {
+        //TODO
+        ServerMessage serverMessage = queueEntry.getMessage();
+        if(serverMessage instanceof Message_1_0)
+        {
+            Message_1_0 message = (Message_1_0) serverMessage;
+            Transfer transfer = new Transfer();
+            //TODO
+
+
+            List<ByteBuffer> fragments = message.getFragments();
+            ByteBuffer payload;
+            if(fragments.size() == 1)
+            {
+                payload = fragments.get(0);
+            }
+            else
+            {
+                int size = 0;
+                for(ByteBuffer fragment : fragments)
+                {
+                    size += fragment.remaining();
+                }
+
+                payload = ByteBuffer.allocate(size);
+
+                for(ByteBuffer fragment : fragments)
+                {
+                    payload.put(fragment.duplicate());
+                }
+
+                payload.flip();
+            }
+
+            transfer.setPayload(payload);
+            byte[] data = new byte[8];
+            ByteBuffer.wrap(data).putLong(_deliveryTag++);
+            final Binary tag = new Binary(data);
+
+            transfer.setDeliveryTag(tag);
+
+            synchronized(_link.getLock())
+            {
+                if(_link.isAttached())
+                {
+                    if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+                    {
+                        transfer.setSettled(true);
+                    }
+                    else
+                    {
+                        UnsettledAction action = _acquires
+                                                 ? new DispositionAction(tag, queueEntry)
+                                                 : new DoNothingAction(tag, queueEntry);
+
+                        _link.addUnsettled(tag, action, queueEntry);
+                    }
+
+                    if(_transactionId != null)
+                    {
+                        TransactionalState state = new TransactionalState();
+                        state.setTxnId(_transactionId);
+                        transfer.setState(state);
+                    }
+                    // TODO - need to deal with failure here
+                    if(_acquires && _transactionId != null)
+                    {
+                        ServerTransaction txn = _link.getTransaction(_transactionId);
+                        if(txn != null)
+                        {
+                            txn.addPostTransactionAction(new ServerTransaction.Action(){
+
+                                public void postCommit()
+                                {
+                                    //To change body of implemented methods use File | Settings | File Templates.
+                                }
+
+                                public void onRollback()
+                                {
+                                    if(queueEntry.isAcquiredBy(Subscription_1_0.this))
+                                    {
+                                        queueEntry.release();
+                                        _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
+
+
+                                    }
+                                }
+                            });
+                        }
+
+                    }
+
+                    getEndpoint().transfer(transfer);
+                }
+                else
+                {
+                    queueEntry.release();
+                }
+            }
+        }
+
+    }
+
+    public void queueDeleted(final AMQQueue queue)
+    {
+        //TODO
+        getEndpoint().setSource(null);
+        getEndpoint().detach();
+    }
+
+    public boolean wouldSuspend(final QueueEntry msg)
+    {
+        final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+        if(!hasCredit && getState() == State.ACTIVE)
+        {
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            }
+        }
+
+        return !hasCredit;
+    }
+
+    public void suspend()
+    {
+        if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+        {
+            _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+        }
+    }
+
+    public void getSendLock()
+    {
+        _stateChangeLock.lock();
+    }
+
+    public void releaseSendLock()
+    {
+        _stateChangeLock.unlock();
+    }
+
+
+    public void onDequeue(final QueueEntry queueEntry)
+    {
+        //TODO
+    }
+
+    public void restoreCredit(final QueueEntry queueEntry)
+    {
+        //TODO
+    }
+
+    public void setStateListener(final StateListener listener)
+    {
+        _stateListener = listener;
+    }
+
+    public State getState()
+    {
+        return _state.get();
+    }
+
+    public AMQQueue.Context getQueueContext()
+    {
+        return _queueContext;
+    }
+
+    public void setQueueContext(AMQQueue.Context queueContext)
+    {
+        _queueContext = queueContext;
+    }
+
+
+    public boolean isActive()
+    {
+        return getState() == State.ACTIVE;
+    }
+
+    public void set(String key, Object value)
+    {
+        _properties.put(key, value);
+    }
+
+    public Object get(String key)
+    {
+        return _properties.get(key);
+    }
+
+    public boolean isSessionTransactional()
+    {
+        return false;  //TODO
+    }
+
+    public void queueEmpty()
+    {
+        if(_link.drained())
+        {
+            if(_state.compareAndSet(State.ACTIVE, State.SUSPENDED))
+            {
+                _stateListener.stateChange(this, State.ACTIVE, State.SUSPENDED);
+            }
+        }
+    }
+
+    public void flowStateChanged()
+    {
+        if(isSuspended())
+        {
+            if(_state.compareAndSet(State.SUSPENDED, State.ACTIVE))
+            {
+                _stateListener.stateChange(this, State.SUSPENDED, State.ACTIVE);
+            }
+            _transactionId = _link.getTransactionId();
+        }
+    }
+
+    private class DispositionAction implements UnsettledAction
+    {
+
+        private final QueueEntry _queueEntry;
+        private final Binary _deliveryTag;
+
+        public DispositionAction(Binary tag, QueueEntry queueEntry)
+        {
+            _deliveryTag = tag;
+            _queueEntry = queueEntry;
+        }
+
+        public boolean process(DeliveryState state, Boolean settled)
+        {
+
+            Binary transactionId = null;
+            final Outcome outcome;
+            // If disposition is settled this overrides the txn?
+            if(state instanceof TransactionalState)
+            {
+                transactionId = ((TransactionalState)state).getTxnId();
+                outcome = ((TransactionalState)state).getOutcome();
+            }
+            else if (state instanceof Outcome)
+            {
+                outcome = (Outcome) state;
+            }
+            else
+            {
+                outcome = null;
+            }
+
+
+            ServerTransaction txn = _link.getTransaction(transactionId);
+
+            if(outcome instanceof Accepted)
+            {
+                txn.dequeue(_queueEntry.getQueue(), _queueEntry.getMessage(),
+                        new ServerTransaction.Action()
+                        {
+
+                            public void postCommit()
+                            {
+                                if(_queueEntry.isAcquiredBy(Subscription_1_0.this))
+                                {
+                                    _queueEntry.discard();
+                                }
+                            }
+
+                            public void onRollback()
+                            {
+
+                            }
+                        });
+                txn.addPostTransactionAction(new ServerTransaction.Action()
+                    {
+                        public void postCommit()
+                        {
+                            //_link.getEndpoint().settle(_deliveryTag);
+                            _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState)outcome, true);
+                            _link.getEndpoint().sendFlowConditional();
+                        }
+
+                        public void onRollback()
+                        {
+                        }
+                    });
+            }
+            else if(outcome instanceof Released)
+            {
+                txn.addPostTransactionAction(new ServerTransaction.Action()
+                {
+                    public void postCommit()
+                    {
+                        _queueEntry.release();
+                        _link.getEndpoint().settle(_deliveryTag);
+                    }
+
+                    public void onRollback()
+                    {
+                        _link.getEndpoint().settle(_deliveryTag);
+                    }
+                });
+            }
+
+            return (transactionId == null && outcome != null);
+        }
+    }
+
+    private class DoNothingAction implements UnsettledAction
+    {
+        public DoNothingAction(final Binary tag,
+                               final QueueEntry queueEntry)
+        {
+        }
+
+        public boolean process(final DeliveryState state, final Boolean settled)
+        {
+            Binary transactionId = null;
+            Outcome outcome = null;
+            // If disposition is settled this overrides the txn?
+            if(state instanceof TransactionalState)
+            {
+                transactionId = ((TransactionalState)state).getTxnId();
+                outcome = ((TransactionalState)state).getOutcome();
+            }
+            else if (state instanceof Outcome)
+            {
+                outcome = (Outcome) state;
+            }
+            return true;
+        }
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorLink_1_0.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.transaction.Declare;
+import org.apache.qpid.amqp_1_0.type.transaction.Declared;
+import org.apache.qpid.amqp_1_0.type.transaction.Discharge;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+public class TxnCoordinatorLink_1_0 implements ReceivingLinkListener, Link_1_0
+{
+    private VirtualHost _vhost;
+    private ReceivingLinkEndpoint _endpoint;
+
+    private ArrayList<Transfer> _incompleteMessage;
+    private SectionDecoder _sectionDecoder;
+    private LinkedHashMap<Integer, ServerTransaction> _openTransactions;
+    private Session_1_0 _session;
+
+
+    public TxnCoordinatorLink_1_0(VirtualHost vhost,
+                                  Session_1_0 session_1_0, ReceivingLinkEndpoint endpoint,
+                                  LinkedHashMap<Integer, ServerTransaction> openTransactions)
+    {
+        _vhost = vhost;
+        _session = session_1_0;
+        _endpoint  = endpoint;
+        _sectionDecoder = new SectionDecoderImpl(endpoint.getSession().getConnection().getDescribedTypeRegistry());
+        _openTransactions = openTransactions;
+    }
+
+    public void messageTransfer(Transfer xfr)
+    {
+        // TODO - cope with fragmented messages
+
+        ByteBuffer payload = null;
+
+
+        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
+        {
+            _incompleteMessage = new ArrayList<Transfer>();
+            _incompleteMessage.add(xfr);
+            return;
+        }
+        else if(_incompleteMessage != null)
+        {
+            _incompleteMessage.add(xfr);
+            if(Boolean.TRUE.equals(xfr.getMore()))
+            {
+                return;
+            }
+
+            int size = 0;
+            for(Transfer t : _incompleteMessage)
+            {
+                size += t.getPayload().limit();
+            }
+            payload = ByteBuffer.allocate(size);
+            for(Transfer t : _incompleteMessage)
+            {
+                payload.put(t.getPayload().duplicate());
+            }
+            payload.flip();
+            _incompleteMessage=null;
+
+        }
+        else
+        {
+            payload = xfr.getPayload();
+        }
+
+
+        // Only interested int he amqp-value section that holds the message to the co-ordinator
+        try
+        {
+            List<Section> sections = _sectionDecoder.parseAll(payload);
+
+            for(Section section : sections)
+            {
+                if(section instanceof AmqpValue)
+                {
+                    Object command = ((AmqpValue) section).getValue();
+
+                    if(command instanceof Declare)
+                    {
+                        Integer txnId = Integer.valueOf(0);
+                        Iterator<Integer> existingTxn  = _openTransactions.keySet().iterator();
+                        while(existingTxn.hasNext())
+                        {
+                            txnId = existingTxn.next();
+                        }
+                        txnId = Integer.valueOf(txnId.intValue() + 1);
+
+                        _openTransactions.put(txnId, new LocalTransaction(_vhost.getTransactionLog()));
+
+                        Declared state = new Declared();
+
+
+
+                        state.setTxnId(_session.integerToBinary(txnId));
+                        _endpoint.updateDisposition(xfr.getDeliveryTag(), state, true);
+
+                    }
+                    else if(command instanceof Discharge)
+                    {
+                        Discharge discharge = (Discharge) command;
+
+                        DeliveryState state = xfr.getState();
+                        discharge(_session.binaryToInteger(discharge.getTxnId()), discharge.getFail());
+                        _endpoint.updateDisposition(xfr.getDeliveryTag(), new Accepted(), true);
+
+                    }
+                }
+            }
+
+        }
+        catch (AmqpErrorException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
+
+    }
+
+    public void remoteDetached(LinkEndpoint endpoint, Detach detach)
+    {
+        //TODO
+        endpoint.detach();
+    }
+
+    private Error discharge(Integer transactionId, boolean fail)
+    {
+        Error error = null;
+        ServerTransaction txn = _openTransactions.get(transactionId);
+        if(txn != null)
+        {
+            if(fail)
+            {
+                txn.rollback();
+            }
+            else
+            {
+                txn.commit();
+            }
+            _openTransactions.remove(transactionId);
+        }
+        else
+        {
+            error = new Error();
+            error.setCondition(AmqpError.NOT_FOUND);
+            error.setDescription("Unkown transactionId" + transactionId);
+        }
+        return error;
+    }
+
+
+
+    public void start()
+    {
+        _endpoint.setLinkCredit(UnsignedInteger.ONE);
+        _endpoint.setCreditWindow();
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,8 @@
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.type.DeliveryState;
+
+public interface UnsettledAction
+{
+    boolean process(DeliveryState state, Boolean settled);
+}

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Aug 14 17:14:51 2011
@@ -245,6 +245,15 @@ public class AMQQueueFactory
             }
         }
 
+        if(config.isTopic())
+        {
+            if(arguments == null)
+            {
+                arguments = new HashMap<String,Object>();
+            }
+            arguments.put("topic", Boolean.TRUE);
+        }
+
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
         q.configure(config);
         return q;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordFilePrincipalDatabase.java Sun Aug 14 17:14:51 2011
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
 import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainInitialiser;
+import org.apache.qpid.server.security.auth.sasl.anonymous.AnonymousInitialiser;
 import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5Initialiser;
 import org.apache.qpid.server.security.auth.sasl.plain.PlainInitialiser;
 
@@ -78,6 +79,13 @@ public class PlainPasswordFilePrincipalD
         AmqPlainInitialiser amqplain = new AmqPlainInitialiser();
         amqplain.initialise(this);
 
+
+
+        // Accept AMQPlain incomming and compare it to the file.
+        AnonymousInitialiser anonymous = new AnonymousInitialiser();
+        anonymous.initialise(this);
+
+
         // Accept Plain incomming and compare it to the file.
         PlainInitialiser plain = new PlainInitialiser();
         plain.initialise(this);
@@ -89,6 +97,7 @@ public class PlainPasswordFilePrincipalD
         _saslServers.put(amqplain.getMechanismName(), amqplain);
         _saslServers.put(plain.getMechanismName(), plain);
         _saslServers.put(cram.getMechanismName(), cram);
+        _saslServers.put(anonymous.getMechanismName(), anonymous);
     }
 
     public void setPasswordFile(String passwordFile) throws IOException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousInitialiser.java Sun Aug 14 17:14:51 2011
@@ -20,18 +20,53 @@
  */
 package org.apache.qpid.server.security.auth.sasl.anonymous;
 
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.SaslServerFactory;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
+import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
 import org.apache.qpid.server.security.auth.sasl.UsernamePasswordInitialiser;
-import org.apache.qpid.server.security.auth.sasl.amqplain.AmqPlainSaslServerFactory;
 
-public class AnonymousInitialiser extends UsernamePasswordInitialiser
+import java.io.IOException;
+import java.util.Map;
+
+public class AnonymousInitialiser implements AuthenticationProviderInitialiser
 {
     public String getMechanismName()
     {
         return "ANONYMOUS";
     }
 
+    public void initialise(String baseConfigPath, Configuration configuration, Map<String, PrincipalDatabase> principalDatabases) throws Exception
+    {
+    }
+
+    public void initialise(PrincipalDatabase db)
+    {
+    }
+
+    public CallbackHandler getCallbackHandler()
+    {
+        return new CallbackHandler()
+        {
+
+            public Callback[] _callbacks;
+
+            public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+            {
+                _callbacks =callbacks;
+            }
+        };
+    }
+
+    public Map<String, ?> getProperties()
+    {
+        return null;
+    }
+
     public Class<? extends SaslServerFactory> getServerFactoryClassForJCARegistration()
     {
         return AnonymousSaslServerFactory.class;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java Sun Aug 14 17:14:51 2011
@@ -22,13 +22,16 @@ package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
 
 import java.nio.ByteBuffer;
 
 public enum MessageMetaDataType
 {
     META_DATA_0_8  {   public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
-    META_DATA_0_10 {   public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } };
+    META_DATA_0_10 {   public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } },
+    META_DATA_1_0 {   public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } };
+
 
 
     public static interface Factory<M extends StorableMessageMetaData>

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Sun Aug 14 17:14:51 2011
@@ -30,7 +30,5 @@ public interface StorableMessageMetaData
 
     int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
 
-    int getContentSize();
-
     boolean isPersistent();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Sun Aug 14 17:14:51 2011
@@ -26,15 +26,13 @@ import java.nio.ByteBuffer;
 public class StoredMemoryMessage implements StoredMessage
 {
     private final long _messageNumber;
-    private final ByteBuffer _content;
+    private ByteBuffer _content;
     private final StorableMessageMetaData _metaData;
 
     public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
     {
         _messageNumber = messageNumber;
         _metaData = metaData;
-        _content = ByteBuffer.allocate(metaData.getContentSize());
-
     }
 
     public long getMessageNumber()
@@ -45,6 +43,16 @@ public class StoredMemoryMessage impleme
     public void addContent(int offsetInMessage, ByteBuffer src)
     {
         src = src.duplicate();
+        if(_content == null || offsetInMessage + src.remaining() > _content.capacity())
+        {
+            ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining());
+            if(_content != null)
+            {
+                newContent.duplicate().put(_content.array());
+            }
+            _content = newContent;
+        }
+
         ByteBuffer dst = _content.duplicate();
         dst.position(offsetInMessage);
         dst.put(src);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Sun Aug 14 17:14:51 2011
@@ -55,16 +55,12 @@ public interface Subscription
 
     void setNoLocal(boolean noLocal);
 
-    AMQShortString getConsumerTag();
-
     long getSubscriptionID();
 
     boolean isSuspended();
 
     boolean hasInterest(QueueEntry msg);
 
-    boolean isAutoClose();
-
     boolean isClosed();
 
     boolean acquires();
@@ -99,11 +95,11 @@ public interface Subscription
 
     boolean isActive();
 
-    void confirmAutoClose();
-
     public void set(String key, Object value);
 
     public Object get(String key);
 
     boolean isSessionTransactional();
+
+    void queueEmpty() throws AMQException;
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Sun Aug 14 17:14:51 2011
@@ -341,7 +341,7 @@ public abstract class SubscriptionImpl i
     {
         return getQueue().getConfigStore();
     }
-    
+
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -777,9 +777,19 @@ public abstract class SubscriptionImpl i
     {
         return _channel.isTransactional();
     }
-    
+
     public long getCreateTime()
     {
         return _createTime;
     }
+
+    public void queueEmpty() throws AMQException
+    {
+        if (isAutoClose())
+        {
+            _queue.unregisterSubscription(this);
+
+            confirmAutoClose();
+        }
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Sun Aug 14 17:14:51 2011
@@ -103,7 +103,7 @@ public class Subscription_0_10 implement
 
                                                 public void stateChange(Subscription sub, State oldState, State newState)
                                                 {
-                                                    CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));    
+                                                    CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
                                                 }
                                             };
     private AMQQueue _queue;
@@ -189,12 +189,7 @@ public class Subscription_0_10 implement
             CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
                     filterLogString.length() > 0));
         }
- 
-    }
 
-    public AMQShortString getConsumerTag()
-    {
-        return new AMQShortString(_destination);
     }
 
     public boolean isSuspended()
@@ -234,12 +229,6 @@ public class Subscription_0_10 implement
         return (_filters == null) || _filters.allAllow(entry);
     }
 
-    public boolean isAutoClose()
-    {
-        // no such thing in 0-10
-        return false;
-    }
-
     public boolean isClosed()
     {
         return getState() == State.CLOSED;
@@ -292,7 +281,7 @@ public class Subscription_0_10 implement
     {
         return getQueue().getConfigStore();
     }
-    
+
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -708,11 +697,6 @@ public class Subscription_0_10 implement
         return getState() == State.ACTIVE;
     }
 
-    public void confirmAutoClose()
-    {
-        //No such thing in 0-10
-    }
-
     public void set(String key, Object value)
     {
         _properties.put(key, value);
@@ -901,6 +885,10 @@ public class Subscription_0_10 implement
         return _session.isTransactional();
     }
 
+    public void queueEmpty()
+    {
+    }
+
     public long getCreateTime()
     {
         return _createTime;
@@ -908,7 +896,7 @@ public class Subscription_0_10 implement
 
     public String toLogString()
     {
-        String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), 
+        String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
                   _queue.getNameShortString());
         String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
                 // queueString is "vh(/{0})/qu({1}) " so need to trim

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Sun Aug 14 17:14:51 2011
@@ -26,6 +26,7 @@ import org.apache.qpid.server.federation
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.configuration.VirtualHostConfig;
 import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
@@ -99,4 +100,6 @@ public interface VirtualHost extends Dur
     ConfigStore getConfigStore();
 
     void removeBrokerConnection(BrokerLink brokerLink);
+
+    LinkRegistry getLinkRegistry(String remoteContainerId);
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java Sun Aug 14 17:14:51 2011
@@ -34,7 +34,7 @@ public class MockStoredMessage implement
 {
     private long _messageId;
     private MessageMetaData _metaData;
-    private final ByteBuffer _content;
+    private ByteBuffer _content;
 
 
     public MockStoredMessage(long messageId)
@@ -46,8 +46,6 @@ public class MockStoredMessage implement
     {
         _messageId = messageId;
         _metaData = new MessageMetaData(info, chb, 0);
-        _content = ByteBuffer.allocate(_metaData.getContentSize());
-
     }
 
     public MessageMetaData getMetaData()
@@ -63,6 +61,16 @@ public class MockStoredMessage implement
     public void addContent(int offsetInMessage, ByteBuffer src)
     {
         src = src.duplicate();
+        if(_content == null || offsetInMessage + src.remaining() > _content.capacity())
+        {
+            ByteBuffer newContent = ByteBuffer.allocate(offsetInMessage+src.remaining());
+            if(_content != null)
+            {
+                newContent.duplicate().put(_content.array());
+            }
+            _content = newContent;
+        }
+
         ByteBuffer dst = _content.duplicate();
         dst.position(offsetInMessage);
         dst.put(src);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Sun Aug 14 17:14:51 2011
@@ -129,11 +129,6 @@ public class MockSubscription implements
         return true;
     }
 
-    public void confirmAutoClose()
-    {
-
-    }
-
     public void set(String key, Object value)
     {
     }
@@ -143,11 +138,6 @@ public class MockSubscription implements
         return null;
     }
 
-    public boolean isAutoClose()
-    {
-        return false;
-    }
-
     public boolean isBrowser()
     {
         return false;
@@ -214,7 +204,7 @@ public class MockSubscription implements
     }
 
     public void setNoLocal(boolean noLocal)
-    {        
+    {
     }
 
     public void setStateListener(StateListener listener)
@@ -241,4 +231,9 @@ public class MockSubscription implements
     {
         return false;
     }
+
+    public void queueEmpty() throws AMQException
+    {
+        //TODO
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps?rev=1157566&r1=1157565&r2=1157566&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/build.deps Sun Aug 14 17:14:51 2011
@@ -102,6 +102,7 @@ client.libs=${geronimo-jms}
 tools.libs=${commons-configuration.libs} ${log4j}
 broker.libs=${commons-cli} ${commons-logging} ${log4j} ${slf4j-log4j} \
     ${xalan} ${felix.libs} ${derby-db} ${commons-configuration.libs}
+amqp-1-0-client.libs=${geronimo-jms} ${commons-cli}
 
 broker-plugins.libs=${felix.libs} ${log4j} ${commons-configuration.libs} 
 management-client.libs=${jsp.libs} ${log4j} ${slf4j-log4j} ${slf4j-api} \



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org