You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC

svn commit: r1157566 [11/23] - in /qpid: branches/rg-amqp-1-0-sandbox/qpid/java/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/ branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,538 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class LinkEndpoint<T extends LinkEventListener>
+{
+
+    private T _linkEventListener;
+    private DeliveryStateHandler _deliveryStateHandler;
+    private Object _flowTransactionId;
+    private SenderSettleMode _sendingSettlementMode;
+    private ReceiverSettleMode _receivingSettlementMode;
+    private Map _initialUnsettledMap;
+    private Map _localUnsettled;
+    private UnsignedInteger _lastSentCreditLimit;
+
+    private enum State
+    {
+        DETACHED,
+        ATTACH_SENT,
+        ATTACH_RECVD,
+        ATTACHED,
+        DETACH_SENT,
+        DETACH_RECVD
+    };
+
+    private final String _name;
+
+    private SessionEndpoint _session;
+
+
+    private volatile State _state = State.DETACHED;
+
+    private Source _source;
+    private Target _target;
+    private UnsignedInteger _deliveryCount;
+    private UnsignedInteger _linkCredit;
+    private UnsignedInteger _available;
+    private Boolean _drain;
+    private UnsignedInteger _localHandle;
+    private UnsignedLong _maxMessageSize;
+
+    private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
+
+    LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+    {
+        _name = name;
+        _session = sessionEndpoint;
+        _linkCredit = UnsignedInteger.valueOf(0);
+        _drain = Boolean.FALSE;
+        _localUnsettled = unsettled;
+
+    }
+
+    LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach)
+    {
+        _session = sessionEndpoint;
+
+        _name = attach.getName();
+        _initialUnsettledMap = attach.getUnsettled();
+
+        _state = State.ATTACH_RECVD;
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    public abstract Role getRole();
+
+    public Source getSource()
+    {
+        return _source;
+    }
+
+    public void setSource(final Source source)
+    {
+        _source = source;
+    }
+
+    public Target getTarget()
+    {
+        return _target;
+    }
+
+    public void setTarget(final Target target)
+    {
+        _target = target;
+    }
+
+    public void setDeliveryCount(final UnsignedInteger deliveryCount)
+    {
+        _deliveryCount = deliveryCount;
+    }
+
+    public void setLinkCredit(final UnsignedInteger linkCredit)
+    {
+        _linkCredit = linkCredit;
+    }
+
+    public void setAvailable(final UnsignedInteger available)
+    {
+        _available = available;
+    }
+
+    public void setDrain(final Boolean drain)
+    {
+        _drain = drain;
+    }
+
+    public UnsignedInteger getDeliveryCount()
+    {
+        return _deliveryCount;
+    }
+
+    public UnsignedInteger getAvailable()
+    {
+        return _available;
+    }
+
+    public Boolean getDrain()
+    {
+        return _drain;
+    }
+
+    public UnsignedInteger getLinkCredit()
+    {
+        return _linkCredit;
+    }
+
+    public void remoteDetached(Detach detach)
+    {
+        synchronized (getLock())
+        {
+            switch(_state)
+            {
+                case DETACH_SENT:
+                    _state = State.DETACHED;
+                    break;
+                case ATTACHED:
+                    _state = State.DETACH_RECVD;
+                    _linkEventListener.remoteDetached(this, detach);
+                    break;
+            }
+            getLock().notifyAll();
+        }
+    }
+
+    public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+    {
+        // TODO
+    }
+
+    public void settledByPeer(final Binary deliveryTag)
+    {
+        // TODO
+    }
+
+    public void receiveFlow(final Flow flow)
+    {
+    }
+
+    public void addUnsettled(final Delivery unsettled)
+    {
+        synchronized(getLock())
+        {
+            _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled);
+            getLock().notifyAll();
+        }
+    }
+
+    public void receiveDeliveryState(final Delivery unsettled,
+                                     final DeliveryState state,
+                                     final Boolean settled)
+    {
+        // TODO
+        synchronized(getLock())
+        {
+            if(_deliveryStateHandler != null)
+            {
+               _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled);
+            }
+
+            if(settled)
+            {
+                settle(unsettled.getDeliveryTag());
+            }
+
+            getLock().notifyAll();
+        }
+
+    }
+
+    public void settle(final Binary deliveryTag)
+    {
+        Delivery delivery = _unsettledTransfers.remove(deliveryTag);
+        if(delivery != null)
+        {
+            getSession().settle(getRole(),delivery.getDeliveryId());
+        }
+
+    }
+
+    public int getUnsettledCount()
+    {
+        synchronized(getLock())
+        {
+            return _unsettledTransfers.size();
+        }
+    }
+
+    public void setLocalHandle(final UnsignedInteger localHandle)
+    {
+        _localHandle = localHandle;
+    }
+
+    public void receiveAttach(final Attach attach)
+    {
+        synchronized(getLock())
+        {
+            switch(_state)
+            {
+                case ATTACH_SENT:
+                {
+
+                    _state = State.ATTACHED;
+                    getLock().notifyAll();
+
+                    _initialUnsettledMap = attach.getUnsettled();
+                    /*  TODO - don't yet handle:
+
+                        attach.getUnsettled();
+                        attach.getProperties();
+                        attach.getDurable();
+                        attach.getExpiryPolicy();
+                        attach.getTimeout();
+                     */
+
+                    break;
+                }
+
+                case DETACHED:
+                {
+                    _state = State.ATTACHED;
+                    getLock().notifyAll();
+                }
+
+
+            }
+
+            if(attach.getRole() == Role.SENDER)
+            {
+                _source = attach.getSource();
+            }
+            else
+            {
+                _target = attach.getTarget();
+            }
+
+            if(getRole() == Role.SENDER)
+            {
+                _maxMessageSize = attach.getMaxMessageSize();
+            }
+
+        }
+    }
+
+    public synchronized boolean isAttached()
+    {
+        return _state == State.ATTACHED;
+    }
+
+    public synchronized boolean isDetached()
+    {
+        return _state == State.DETACHED;
+    }
+
+    public SessionEndpoint getSession()
+    {
+        return _session;
+    }
+
+    public UnsignedInteger getLocalHandle()
+    {
+        return _localHandle;
+    }
+
+    public Object getLock()
+    {
+        return _session.getLock();
+    }
+
+    public void attach()
+    {
+        synchronized(getLock())
+        {
+            Attach attachToSend = new Attach();
+            attachToSend.setName(getName());
+            attachToSend.setRole(getRole());
+            attachToSend.setHandle(getLocalHandle());
+            attachToSend.setSource(getSource());
+            attachToSend.setTarget(getTarget());
+            attachToSend.setSndSettleMode(getSendingSettlementMode());
+            attachToSend.setRcvSettleMode(getReceivingSettlementMode());
+            attachToSend.setUnsettled(_localUnsettled);
+
+            if(getRole() == Role.SENDER)
+            {
+                attachToSend.setInitialDeliveryCount(_deliveryCount);
+            }
+
+            switch(_state)
+            {
+                case DETACHED:
+                    _state = State.ATTACH_SENT;
+                    break;
+                case ATTACH_RECVD:
+                    _state = State.ATTACHED;
+                    break;
+                default:
+                    // TODO ERROR
+            }
+
+            getSession().sendAttach(attachToSend);
+
+            getLock().notifyAll();
+
+        }
+
+    }
+
+
+    public void detach()
+    {
+        detach(null, false);
+    }
+
+    public void close()
+    {
+        detach(null, true);
+    }
+
+    public void close(Error error)
+    {
+        detach(error, true);
+    }
+
+    public void detach(Error error)
+    {
+        detach(error, false);
+    }
+
+    private void detach(Error error, boolean close)
+    {
+        synchronized(getLock())
+        {
+            //TODO
+            switch(_state)
+            {
+                case ATTACHED:
+                    _state = State.DETACH_SENT;
+                    break;
+                case DETACH_RECVD:
+                    _state = State.DETACHED;
+                    break;
+                default:
+                    return;
+            }
+
+            Detach detach = new Detach();
+            detach.setHandle(getLocalHandle());
+            if(close)
+                detach.setClosed(close);
+            detach.setError(error);
+
+            getSession().sendDetach(detach);
+
+            getLock().notifyAll();
+        }
+
+    }
+
+
+
+
+    public void setTransactionId(final Object txnId)
+    {
+        _flowTransactionId = txnId;
+    }
+
+    public void sendFlowConditional()
+    {
+        if(_lastSentCreditLimit != null)
+        {
+            UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+            int i = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit);
+            if(i >=0)
+            {
+                sendFlow(_flowTransactionId != null);
+            }
+            else
+            {
+                getSession().sendFlowConditional();
+            }
+        }
+        else
+        {
+            sendFlow(_flowTransactionId != null);
+        }
+    }
+
+
+    public void sendFlow()
+    {
+        sendFlow(_flowTransactionId != null);
+    }
+
+    public void sendFlow(boolean setTransactionId)
+    {
+        if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
+        {
+            Flow flow = new Flow();
+            flow.setLinkCredit(_linkCredit);
+            flow.setDeliveryCount(_deliveryCount);
+            _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+            flow.setAvailable(_available);
+            flow.setDrain(_drain);
+            if(setTransactionId)
+            {
+                flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));
+            }
+            flow.setHandle(getLocalHandle());
+            getSession().sendFlow(flow);
+        }
+    }
+
+    public T getLinkEventListener()
+    {
+        return _linkEventListener;
+    }
+
+    public void setLinkEventListener(final T linkEventListener)
+    {
+        synchronized(getLock())
+        {
+            _linkEventListener = linkEventListener;
+        }
+    }
+
+    public DeliveryStateHandler getDeliveryStateHandler()
+    {
+        return _deliveryStateHandler;
+    }
+
+    public void setDeliveryStateHandler(final DeliveryStateHandler deliveryStateHandler)
+    {
+        synchronized(getLock())
+        {
+            _deliveryStateHandler = deliveryStateHandler;
+        }
+    }
+
+    public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
+    {
+        _sendingSettlementMode = sendingSettlementMode;
+    }
+
+    public SenderSettleMode getSendingSettlementMode()
+    {
+        return _sendingSettlementMode;
+    }
+
+    public ReceiverSettleMode getReceivingSettlementMode()
+    {
+        return _receivingSettlementMode;
+    }
+
+    public void setReceivingSettlementMode(ReceiverSettleMode receivingSettlementMode)
+    {
+        _receivingSettlementMode = receivingSettlementMode;
+    }
+
+    public Map getInitialUnsettledMap()
+    {
+        return _initialUnsettledMap;
+    }
+
+    public void setLocalUnsettled(Map unsettled)
+    {
+        _localUnsettled = unsettled;
+    }
+
+    @Override public String toString()
+    {
+        return "LinkEndpoint{" +
+               "_name='" + _name + '\'' +
+               ", _session=" + _session +
+               ", _state=" + _state +
+               ", _role=" + getRole() +
+               ", _source=" + _source +
+               ", _target=" + _target +
+               ", _transferCount=" + _deliveryCount +
+               ", _linkCredit=" + _linkCredit +
+               ", _available=" + _available +
+               ", _drain=" + _drain +
+               ", _localHandle=" + _localHandle +
+               ", _maxMessageSize=" + _maxMessageSize +
+               '}';
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+
+public interface LinkEventListener
+{
+
+
+    void remoteDetached(final LinkEndpoint endpoint, Detach detach);
+
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+public class Node
+{
+    
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ProtocolHeaderTransport implements BytesTransport
+{
+    private final Object _inputLock = new Object();
+    private final Object _outputLock = new Object();
+
+    private volatile boolean _inputOpen;
+    private volatile boolean _outputOpen;
+
+    private final Map<Binary,BytesTransport> _headersMap;
+
+    private BytesTransport _delegate;
+    private ByteBuffer _bytesToSend;
+
+    private byte[] _header = new byte[8];
+    private int _received;
+    private ByteBuffer _outputBuffer;
+
+    public ProtocolHeaderTransport(Map<Binary, BytesTransport> validHeaders)
+    {
+        _headersMap = validHeaders;
+
+        // if only one valid header then we can send, else we have to wait
+    }
+
+    public boolean isOpenForInput()
+    {
+        return _inputOpen;
+    }
+
+    public void inputClosed()
+    {
+        synchronized(_inputLock)
+        {
+            _inputOpen = false;
+            _inputLock.notifyAll();
+        }
+    }
+
+    public void processBytes(final ByteBuffer buf)
+    {
+        if(_delegate != null)
+        {
+            _delegate.processBytes(buf);
+        }
+        else
+        {
+
+            while( _received < 8 && buf.hasRemaining())
+            {
+                _header[_received++] = buf.get();
+            }
+            if(_received == 8)
+            {
+                Binary header = new Binary(_header);
+                _delegate = _headersMap.get(header);
+                if(_delegate != null)
+                {
+                    _delegate.processBytes(ByteBuffer.wrap(_header));
+                    _delegate.processBytes(buf);
+                }
+                else
+                {
+                    inputClosed();
+                    _outputBuffer = _headersMap.keySet().iterator().next().asByteBuffer();
+                }
+            }
+        }
+    }
+
+    public void setInputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+
+    public void getNextBytes(final BytesProcessor processor)
+    {
+        if(_bytesToSend != null && _bytesToSend.hasRemaining())
+        {
+            processor.processBytes(_bytesToSend);
+        }
+        else if(_delegate != null)
+        {
+            _delegate.getNextBytes(processor);
+        }
+
+    }
+
+    public void outputClosed()
+    {
+        synchronized (_outputLock)
+        {
+            _outputOpen = false;
+            _outputLock.notifyAll();
+        }
+    }
+
+    public boolean isOpenForOutput()
+    {
+        return _outputOpen;
+    }
+
+    public void setOutputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+
+import java.util.*;
+
+public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
+{
+
+
+    private static class TransientState
+    {
+
+        UnsignedInteger _deliveryId;
+        int _credit = 1;
+        boolean _settled;
+
+        private TransientState(final UnsignedInteger transferId)
+        {
+            _deliveryId = transferId;
+        }
+
+        void incrementCredit()
+        {
+            _credit++;
+        }
+
+        public int getCredit()
+        {
+            return _credit;
+        }
+
+        public UnsignedInteger getDeliveryId()
+        {
+            return _deliveryId;
+        }
+
+        public boolean isSettled()
+        {
+            return _settled;
+        }
+
+        public void setSettled(boolean settled)
+        {
+            _settled = settled;
+        }
+    }
+
+    private Map<Binary, Object> _unsettledMap = new LinkedHashMap<Binary, Object>();
+    private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>();
+    private boolean _creditWindow;
+    private boolean _remoteDrain;
+    private UnsignedInteger _remoteTransferCount;
+    private UnsignedInteger _drainLimit;
+
+
+    public ReceivingLinkEndpoint(final SessionEndpoint session, String name)
+    {
+        this(session,name,null);
+    }
+
+    public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap)
+    {
+        super(session, name, unsettledMap);
+        setDeliveryCount(UnsignedInteger.valueOf(0));
+        setLinkEventListener(ReceivingLinkListener.DEFAULT);
+    }
+
+    public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach)
+    {
+        super(session, attach);
+        setDeliveryCount(attach.getInitialDeliveryCount());
+        setLinkEventListener(ReceivingLinkListener.DEFAULT);
+        setSendingSettlementMode(attach.getSndSettleMode());
+        setReceivingSettlementMode(attach.getRcvSettleMode());
+    }
+
+
+    @Override public Role getRole()
+    {
+        return Role.RECEIVER;
+    }
+
+    @Override
+    public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+    {
+        synchronized (getLock())
+        {
+            TransientState transientState;
+            boolean existingState = _unsettledMap.containsKey(transfer.getDeliveryTag());
+            _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+            if(!existingState)
+            {
+                transientState = new TransientState(transfer.getDeliveryId());
+                if(Boolean.TRUE.equals(transfer.getSettled()))
+                {
+                    transientState.setSettled(true);
+                }
+                _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+                setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
+                setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
+
+            }
+            else
+            {
+                transientState = _unsettledIds.get(transfer.getDeliveryTag());
+                transientState.incrementCredit();
+                if(Boolean.TRUE.equals(transfer.getSettled()))
+                {
+                    transientState.setSettled(true);
+                }
+            }
+
+            if(transientState.isSettled())
+            {
+                _unsettledMap.remove(transfer.getDeliveryTag());
+            }
+            getLinkEventListener().messageTransfer(transfer);
+
+
+            getLock().notifyAll();
+        }
+    }
+
+    @Override public void receiveFlow(final Flow flow)
+    {
+        synchronized (getLock())
+        {
+            super.receiveFlow(flow);
+            _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain());
+            setAvailable(flow.getAvailable());
+            _remoteTransferCount = flow.getDeliveryCount();
+            getLock().notifyAll();
+        }
+    }
+
+
+    public boolean isDrained()
+    {
+        return getDrain() && getDeliveryCount().equals(getDrainLimit());
+    }
+
+    @Override
+    public void settledByPeer(final Binary deliveryTag)
+    {
+        synchronized (getLock())
+        {
+            // TODO XXX : need to do anything about the window here?
+            if(settled(deliveryTag) && _creditWindow)
+            {
+                sendFlowConditional();
+            }
+        }
+    }
+
+    public boolean settled(final Binary deliveryTag)
+    {
+        synchronized(getLock())
+        {
+            boolean deleted;
+            if(deleted = (_unsettledIds.remove(deliveryTag) != null))
+            {
+                _unsettledMap.remove(deliveryTag);
+
+                getLock().notifyAll();
+            }
+
+            return deleted;
+        }
+    }
+
+    public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+    {
+        synchronized(getLock())
+        {
+            if(_unsettledMap.containsKey(deliveryTag))
+            {
+                boolean outcomeUpdate = false;
+                Outcome outcome=null;
+                if(state instanceof Outcome)
+                {
+                    outcome = (Outcome)state;
+                }
+                else if(state instanceof TransactionalState)
+                {
+                    // TODO? Is this correct
+                    outcome = ((TransactionalState)state).getOutcome();
+                }
+
+                if(outcome != null)
+                {
+                    Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
+                    outcomeUpdate = !outcome.equals(oldOutcome);
+                }
+
+
+
+
+                TransientState transientState = _unsettledIds.get(deliveryTag);
+                if(outcomeUpdate || settled)
+                {
+
+                    final UnsignedInteger transferId = transientState.getDeliveryId();
+
+                    getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
+                }
+
+
+                if(settled)
+                {
+
+                    if(settled(deliveryTag))
+                    {
+                        if(_creditWindow)
+                        {
+                            setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+                            sendFlowConditional();
+                        }
+                        else
+                        {
+                            getSession().sendFlowConditional();
+                        }
+                    }
+                }
+                getLock().notifyAll();
+            }
+            else
+            {
+                TransientState transientState = _unsettledIds.get(deliveryTag);
+                if(_creditWindow)
+                {
+                    setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+                    sendFlowConditional();
+                }
+
+            }
+        }
+
+    }
+
+
+    public void setCreditWindow()
+    {
+        setCreditWindow(true);
+    }
+    public void setCreditWindow(boolean window)
+    {
+
+        _creditWindow = window;
+        sendFlowConditional();
+
+    }
+
+    public void drain()
+    {
+        synchronized (getLock())
+        {
+            setDrain(true);
+            _creditWindow = false;
+            _drainLimit = getDeliveryCount().add(getLinkCredit());
+            sendFlow();
+            getLock().notifyAll();
+        }
+    }
+
+    @Override
+    public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+    {
+        super.receiveDeliveryState(unsettled, state, settled);
+        if(_creditWindow)
+        {
+            if(Boolean.TRUE.equals(settled))
+            {
+                setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+                sendFlowConditional();
+            }
+        }
+    }
+
+    public void requestTransactionalSend(Object txnId)
+    {
+        synchronized (getLock())
+        {
+            setDrain(true);
+            _creditWindow = false;
+            setTransactionId(txnId);
+            sendFlow();
+            getLock().notifyAll();
+        }
+    }
+
+    private void sendFlow(final Object transactionId)
+    {
+        sendFlow();
+    }
+
+
+    public void clearDrain()
+    {
+        synchronized (getLock())
+        {
+            setDrain(false);
+            sendFlow();
+            getLock().notifyAll();
+        }
+    }
+
+    public void updateAllDisposition(Binary deliveryTag, Outcome outcome, boolean settled)
+    {
+        synchronized(getLock())
+        {
+            if(!_unsettledIds.isEmpty())
+            {
+                Binary firstTag = _unsettledIds.keySet().iterator().next();
+                Binary lastTag = deliveryTag;
+                updateDispositions(firstTag, lastTag, (DeliveryState) outcome, settled);
+            }
+        }
+    }
+
+    private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled)
+    {
+        SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger,UnsignedInteger>();
+
+        synchronized(getLock())
+        {
+
+            Iterator<Binary> iter = _unsettledIds.keySet().iterator();
+            List<Binary> tagsToUpdate = new ArrayList<Binary>();
+            Binary tag = null;
+
+            while(iter.hasNext() && !(tag = iter.next()).equals(firstTag));
+
+            if(firstTag.equals(tag))
+            {
+                tagsToUpdate.add(tag);
+
+                UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+
+                UnsignedInteger first = deliveryId;
+                UnsignedInteger last = first;
+
+                while(iter.hasNext())
+                {
+                    tag = iter.next();
+                    tagsToUpdate.add(tag);
+
+                    deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+
+                    if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
+                    {
+                        last = deliveryId;
+                    }
+                    else
+                    {
+                        ranges.put(first,last);
+                        first = last = deliveryId;
+                    }
+
+                    if(tag.equals(lastTag))
+                    {
+                        break;
+                    }
+                }
+
+                ranges.put(first,last);
+            }
+
+            if(settled)
+            {
+
+                for(Binary deliveryTag : tagsToUpdate)
+                {
+                    if(settled(deliveryTag) && _creditWindow)
+                    {
+                        setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1)));
+                    }
+                }
+                sendFlowConditional();
+            }
+
+
+
+            for(Map.Entry<UnsignedInteger,UnsignedInteger> range : ranges.entrySet())
+            {
+                getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled);
+            }
+
+
+            getLock().notifyAll();
+        }
+
+    }
+
+    @Override
+    public void settle(Binary deliveryTag)
+    {
+        super.settle(deliveryTag);
+        if(_creditWindow)
+        {
+             sendFlowConditional();
+        }
+
+    }
+
+    public UnsignedInteger getDrainLimit()
+    {
+        return _drainLimit;
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public interface ReceivingLinkListener extends LinkEventListener
+{
+    void messageTransfer(Transfer xfr);
+
+    class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener
+    {
+        public void messageTransfer(final Transfer xfr)
+        {
+
+        }
+
+        public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+        {
+            endpoint.detach();
+        }
+    }
+
+    public static final org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener DEFAULT = new DefaultLinkEventListener();
+
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class ReceivingSessionHalfEndpoint extends SessionHalfEndpoint
+{
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.security.SaslChallenge;
+import org.apache.qpid.amqp_1_0.type.security.SaslInit;
+import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms;
+import org.apache.qpid.amqp_1_0.type.security.SaslOutcome;
+import org.apache.qpid.amqp_1_0.type.security.SaslResponse;
+
+public interface SASLEndpoint
+{
+    void receiveSaslInit(SaslInit saslInit);
+
+    void receiveSaslMechanisms(SaslMechanisms saslMechanisms);
+
+    void receiveSaslChallenge(SaslChallenge saslChallenge);
+
+    void receiveSaslResponse(SaslResponse saslResponse);
+
+    void receiveSaslOutcome(SaslOutcome saslOutcome);
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.security.SaslChallenge;
+import org.apache.qpid.amqp_1_0.type.security.SaslInit;
+import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms;
+import org.apache.qpid.amqp_1_0.type.security.SaslOutcome;
+import org.apache.qpid.amqp_1_0.type.security.SaslResponse;
+
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.PrintWriter;
+import java.util.Arrays;
+
+
+public class SASLEndpointImpl
+        implements DescribedTypeConstructorRegistry.Source, ValueWriter.Registry.Source, SASLEndpoint
+{
+    private static final short SASL_CONTROL_CHANNEL = (short) 0;
+
+    private static final byte[] EMPTY_CHALLENGE = new byte[0];
+
+    private FrameTransport _transport;
+
+
+    private static enum State
+    {
+        BEGIN_SERVER,
+        BEGIN_CLIENT,
+        SENT_MECHANISMS,
+        SENT_INIT,
+        SENT_REPSONSE,
+        SENT_CHALLENGE,
+        SENT_OUTCOME
+    };
+
+
+    public PrintWriter _out;
+
+
+    private State _state;
+
+    private SaslClient _saslClient;
+    private SaslServer _saslServer;
+
+    private boolean _isReadable;
+    private boolean _isWritable;
+    private boolean _closedForInput;
+    private boolean _closedForOutput;
+
+    private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance().registerSecurityLayer();
+
+    private FrameOutputHandler _frameOutputHandler;
+
+    private byte _majorVersion;
+    private byte _minorVersion;
+    private byte _revision;
+    private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+    private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT;
+    private Symbol[] _mechanisms;
+    private Symbol _mechanism;
+
+
+    private SASLEndpointImpl(FrameTransport transport, State initialState, Symbol... mechanisms)
+    {
+        _transport = transport;
+        _state = initialState;
+        _mechanisms = mechanisms;
+    }
+
+
+    public void setFrameOutputHandler(final FrameOutputHandler frameOutputHandler)
+    {
+        _frameOutputHandler = frameOutputHandler;
+        if(_state == State.BEGIN_SERVER)
+        {
+            sendMechanisms();
+        }
+    }
+
+    private synchronized void sendMechanisms()
+    {
+        SaslMechanisms saslMechanisms = new SaslMechanisms();
+
+        saslMechanisms.setSaslServerMechanisms(_mechanisms);
+
+        _state = State.SENT_MECHANISMS;
+
+        send(saslMechanisms);
+    }
+
+    public boolean isReadable()
+    {
+        return _isReadable;
+    }
+
+    public boolean isWritable()
+    {
+        return _isWritable;
+    }
+
+
+    public synchronized void send(SaslFrameBody body)
+    {
+        if(!_closedForOutput)
+        {
+            if(_out != null)
+            {
+                _out.println("SEND : " + body);
+                _out.flush();
+            }
+            //_frameOutputHandler.send(new SASLFrame(body));
+        }
+    }
+
+
+
+    public void invalidHeaderReceived()
+    {
+        // TODO
+        _closedForInput = true;
+    }
+
+    public synchronized boolean closedForInput()
+    {
+        return _closedForInput;
+    }
+
+    public synchronized void protocolHeaderReceived(final byte major, final byte minorVersion, final byte revision)
+    {
+        _majorVersion = major;
+        _minorVersion = minorVersion;
+        _revision = revision;
+    }
+
+
+    public synchronized void receive(final short channel, final Object frame)
+    {
+        if(_out != null)
+        {
+            _out.println( "RECV["+channel+"] : " + frame);
+            _out.flush();
+        }
+        if(frame instanceof SaslFrameBody)
+        {
+            ((SaslFrameBody)frame).invoke(this);
+        }
+        else
+        {
+            // TODO
+        }
+    }
+
+    public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
+    {
+        return _describedTypeRegistry;
+    }
+
+    public synchronized void setClosedForOutput(boolean b)
+    {
+        _closedForOutput = true;
+        notifyAll();
+    }
+
+    public synchronized boolean closedForOutput()
+    {
+        return _closedForOutput;
+    }
+
+
+    public Object getLock()
+    {
+        return this;
+    }
+
+
+    public byte getMajorVersion()
+    {
+        return _majorVersion;
+    }
+
+
+    public byte getMinorVersion()
+    {
+        return _minorVersion;
+    }
+
+    public byte getRevision()
+    {
+        return _revision;
+    }
+
+
+    public void receiveSaslInit(final SaslInit saslInit)
+    {
+        _mechanism = saslInit.getMechanism();
+        try
+        {
+            _saslServer = Sasl.createSaslServer(_mechanism.toString(), "AMQP", "localhost", null,  createServerCallbackHandler(_mechanism));
+        }
+        catch (SaslException e)
+        {
+            e.printStackTrace();  //TODO
+        }
+    }
+
+    private CallbackHandler createServerCallbackHandler(final Symbol mechanism)
+    {
+        return null;  //TODO
+    }
+
+    public synchronized void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+    {
+        Symbol[] serverMechanisms = saslMechanisms.getSaslServerMechanisms();
+        for(Symbol mechanism : _mechanisms)
+        {
+            if(Arrays.asList(serverMechanisms).contains(mechanism))
+            {
+                _mechanism = mechanism;
+                break;
+            }
+        }
+        // TODO - no matching mechanism
+        try
+        {
+            _saslClient = Sasl.createSaslClient(new String[] { _mechanism.toString() }, null, "AMQP", "localhost", null,
+                                createClientCallbackHandler(_mechanism));
+            SaslInit init = new SaslInit();
+            init.setMechanism(_mechanism);
+            init.setInitialResponse(_saslClient.hasInitialResponse() ? new Binary(_saslClient.evaluateChallenge(EMPTY_CHALLENGE)) : null);
+            send(init);
+        }
+        catch (SaslException e)
+        {
+            e.printStackTrace();  //TODO
+        }
+    }
+
+    private CallbackHandler createClientCallbackHandler(final Symbol mechanism)
+    {
+        return null;  //TODO
+    }
+
+    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+    {
+        //TODO
+    }
+
+    public void receiveSaslResponse(final SaslResponse saslResponse)
+    {
+        //TODO
+    }
+
+
+    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+    {
+        //TODO
+    }
+
+
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.framing.SASLFrame;
+
+public class SASLFrameTransport implements FrameTransport<SASLFrame>
+{
+    private final Object _inputLock = new Object();
+    private final Object _outputLock = new Object();
+
+    private volatile boolean _inputOpen;
+    private volatile boolean _outputOpen;
+
+    public boolean isOpenForInput()
+    {
+        return _inputOpen;
+    }
+
+    public void closeForInput()
+    {
+        synchronized(_inputLock)
+        {
+            _inputOpen = false;
+            _inputLock.notifyAll();
+        }
+    }
+    public void processIncomingFrame(final SASLFrame frame)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public SASLFrame getNextFrame()
+    {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void closeForOutput()
+    {
+        synchronized (_outputLock)
+        {
+            _outputOpen = false;
+            _outputLock.notifyAll();
+        }
+    }
+
+    public boolean isOpenForOutput()
+    {
+        return _outputOpen;
+    }
+
+    public void setInputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setOutputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+
+import java.nio.ByteBuffer;
+
+public class SASLTransport implements BytesTransport
+{
+    private final Object _inputLock = new Object();
+    private final Object _outputLock = new Object();
+
+    private volatile boolean _inputOpen;
+    private volatile boolean _outputOpen;
+
+    private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
+                                                                                    .registerSecurityLayer();
+
+    public boolean isOpenForInput()
+    {
+        return _inputOpen;
+    }
+
+    public void inputClosed()
+    {
+        synchronized(_inputLock)
+        {
+            _inputOpen = false;
+            _inputLock.notifyAll();
+        }
+    }
+
+    public void processBytes(final ByteBuffer buf)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setInputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void getNextBytes(final BytesProcessor processor)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void outputClosed()
+    {
+        synchronized (_outputLock)
+        {
+            _outputOpen = false;
+            _outputLock.notifyAll();
+        }
+    }
+
+    public boolean isOpenForOutput()
+    {
+        return _outputOpen;
+    }
+
+    public void setOutputStateChangeListener(final StateChangeListener listener)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
+{
+
+    private UnsignedInteger _lastDeliveryId;
+    private Binary _lastDeliveryTag;
+    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+    private Binary _transactionId;
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+    {
+        this(sessionEndpoint, name, null);
+    }
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+    {
+        super(sessionEndpoint, name, unsettled);
+        init();
+    }
+
+    public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+    {
+        super(sessionEndpoint, attach);
+        setSendingSettlementMode(attach.getSndSettleMode());
+        setReceivingSettlementMode(attach.getRcvSettleMode());
+        init();
+    }
+
+    private void init()
+    {
+        setDeliveryCount(UnsignedInteger.valueOf(0));
+        setAvailable(UnsignedInteger.valueOf(0));
+        setLinkEventListener(SendingLinkListener.DEFAULT);
+    }
+
+    @Override public Role getRole()
+    {
+        return Role.SENDER;
+    }
+
+    public boolean transfer(final Transfer xfr)
+    {
+        SessionEndpoint s = getSession();
+        int transferCount;
+        transferCount = _lastDeliveryTag == null ? 1 : 1;
+        xfr.setMessageFormat(UnsignedInteger.ZERO);
+        synchronized(getLock())
+        {
+
+            final int currentCredit = getLinkCredit().intValue() - transferCount;
+
+            if(currentCredit < 0)
+            {
+                return false;
+            }
+            else
+            {
+                setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+            }
+
+            setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
+
+            xfr.setHandle(getLocalHandle());
+
+            s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
+
+            if(!Boolean.TRUE.equals(xfr.getSettled()))
+            {
+                _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
+            }
+        }
+
+        if(Boolean.TRUE.equals(xfr.getMore()))
+        {
+            _lastDeliveryTag = xfr.getDeliveryTag();
+        }
+        else
+        {
+            _lastDeliveryTag = null;
+        }
+
+        return true;
+    }
+
+
+    public void drained()
+    {
+        synchronized (getLock())
+        {
+            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            setLinkCredit(UnsignedInteger.ZERO);
+            sendFlow();
+        }
+    }
+
+    @Override
+    public void receiveFlow(final Flow flow)
+    {
+        super.receiveFlow(flow);    //To change body of overridden methods use File | Settings | File Templates.
+        UnsignedInteger t = flow.getDeliveryCount();
+        UnsignedInteger c = flow.getLinkCredit();
+        setDrain(flow.getDrain());
+
+        Map options;
+        if((options = flow.getProperties()) != null)
+        {
+             _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
+        }
+
+        if(t == null)
+        {
+            setLinkCredit(c);
+        }
+        else
+        {
+            UnsignedInteger limit = t.add(c);
+            if(limit.compareTo(getDeliveryCount())<=0)
+            {
+                setLinkCredit(UnsignedInteger.valueOf(0));
+            }
+            else
+            {
+                setLinkCredit(limit.subtract(getDeliveryCount()));
+            }
+        }
+        getLinkEventListener().flowStateChanged();
+
+    }
+
+    public boolean hasCreditToSend()
+    {
+        UnsignedInteger linkCredit = getLinkCredit();
+        return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+               && getSession().hasCreditToSend();
+    }
+
+    public void receiveDeliveryState(final Delivery unsettled,
+                                               final DeliveryState state,
+                                               final Boolean settled)
+    {
+        super.receiveDeliveryState(unsettled, state, settled);
+        if(settled)
+        {
+            _unsettledMap.remove(unsettled.getDeliveryTag());
+        }
+    }
+
+    public UnsignedInteger getLastDeliveryId()
+    {
+        return _lastDeliveryId;
+    }
+
+    public void setLastDeliveryId(final UnsignedInteger deliveryId)
+    {
+        _lastDeliveryId = deliveryId;
+    }
+
+    public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+    {
+        synchronized(getLock())
+        {
+            UnsignedInteger deliveryId;
+            if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+            {
+                settle(deliveryTag);
+                getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+            }
+
+        }
+    }
+
+    public Binary getTransactionId()
+    {
+        return _transactionId;
+    }
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public interface SendingLinkListener extends LinkEventListener
+{
+    void flowStateChanged();
+
+    class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.SendingLinkListener
+    {
+
+        public void remoteDetached(final LinkEndpoint endpoint, final org.apache.qpid.amqp_1_0.type.transport.Detach detach)
+        {
+            endpoint.detach();
+        }
+
+        public void flowStateChanged()
+        {
+
+        }
+    }
+
+    public static final org.apache.qpid.amqp_1_0.transport.SendingLinkListener DEFAULT = new DefaultLinkEventListener();
+
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SendingSessionHalfEndpoint extends SessionHalfEndpoint
+{
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+{
+    private int _seqNo;
+
+    public SequenceNumber(int seqNo)
+    {
+        _seqNo = seqNo;
+    }
+
+    public SequenceNumber incr()
+    {
+        _seqNo++;
+        return this;
+    }
+
+    public SequenceNumber decr()
+    {
+        _seqNo--;
+        return this;
+    }
+
+    public static SequenceNumber add(SequenceNumber a, int i)
+    {
+        return a.clone().add(i);
+    }
+
+    public static SequenceNumber subtract(SequenceNumber a, int i)
+    {
+        return a.clone().add(-i);
+    }
+
+    private SequenceNumber add(int i)
+    {
+        _seqNo+=i;
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        SequenceNumber that = (SequenceNumber) o;
+
+        if (_seqNo != that._seqNo)
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _seqNo;
+    }
+
+    public int compareTo(SequenceNumber o)
+    {
+        return _seqNo - o._seqNo;
+    }
+
+    @Override
+    public SequenceNumber clone()
+    {
+        return new SequenceNumber(_seqNo);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SN{" + _seqNo + '}';
+    }
+
+    public int intValue()
+    {
+        return _seqNo;
+    }
+}

Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SessionAttachment
+{
+    private final SessionEndpoint _sessionEndpoint;
+    private final ConnectionEndpoint _connectionEndpoint;
+    private final short _channel;
+
+    public SessionAttachment(SessionEndpoint sessionEndpoint, ConnectionEndpoint connectionEndpoint, short channel)
+    {
+        _sessionEndpoint = sessionEndpoint;
+        _connectionEndpoint = connectionEndpoint;
+        _channel = channel;
+    }
+
+    public SessionEndpoint getSessionEndpoint()
+    {
+        return _sessionEndpoint;
+    }
+
+    public ConnectionEndpoint getConnectionEndpoint()
+    {
+        return _connectionEndpoint;
+    }
+
+    public short getChannel()
+    {
+        return _channel;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        SessionAttachment that = (SessionAttachment) o;
+
+        if (_channel != that._channel)
+        {
+            return false;
+        }
+        if (_connectionEndpoint != null
+            ? !_connectionEndpoint.equals(that._connectionEndpoint)
+            : that._connectionEndpoint != null)
+        {
+            return false;
+        }
+        if (_sessionEndpoint != null ? !_sessionEndpoint.equals(that._sessionEndpoint) : that._sessionEndpoint != null)
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = _sessionEndpoint != null ? _sessionEndpoint.hashCode() : 0;
+        result = 31 * result + (_connectionEndpoint != null ? _connectionEndpoint.hashCode() : 0);
+        result = 31 * result + (int) _channel;
+        return result;
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org