You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/08/14 19:15:08 UTC
svn commit: r1157566 [11/23] - in /qpid:
branches/rg-amqp-1-0-sandbox/qpid/java/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/
branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/...
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,538 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class LinkEndpoint<T extends LinkEventListener>
+{
+
+ private T _linkEventListener;
+ private DeliveryStateHandler _deliveryStateHandler;
+ private Object _flowTransactionId;
+ private SenderSettleMode _sendingSettlementMode;
+ private ReceiverSettleMode _receivingSettlementMode;
+ private Map _initialUnsettledMap;
+ private Map _localUnsettled;
+ private UnsignedInteger _lastSentCreditLimit;
+
+ private enum State
+ {
+ DETACHED,
+ ATTACH_SENT,
+ ATTACH_RECVD,
+ ATTACHED,
+ DETACH_SENT,
+ DETACH_RECVD
+ };
+
+ private final String _name;
+
+ private SessionEndpoint _session;
+
+
+ private volatile State _state = State.DETACHED;
+
+ private Source _source;
+ private Target _target;
+ private UnsignedInteger _deliveryCount;
+ private UnsignedInteger _linkCredit;
+ private UnsignedInteger _available;
+ private Boolean _drain;
+ private UnsignedInteger _localHandle;
+ private UnsignedLong _maxMessageSize;
+
+ private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
+
+ LinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+ {
+ _name = name;
+ _session = sessionEndpoint;
+ _linkCredit = UnsignedInteger.valueOf(0);
+ _drain = Boolean.FALSE;
+ _localUnsettled = unsettled;
+
+ }
+
+ LinkEndpoint(final SessionEndpoint sessionEndpoint,final Attach attach)
+ {
+ _session = sessionEndpoint;
+
+ _name = attach.getName();
+ _initialUnsettledMap = attach.getUnsettled();
+
+ _state = State.ATTACH_RECVD;
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+
+ public abstract Role getRole();
+
+ public Source getSource()
+ {
+ return _source;
+ }
+
+ public void setSource(final Source source)
+ {
+ _source = source;
+ }
+
+ public Target getTarget()
+ {
+ return _target;
+ }
+
+ public void setTarget(final Target target)
+ {
+ _target = target;
+ }
+
+ public void setDeliveryCount(final UnsignedInteger deliveryCount)
+ {
+ _deliveryCount = deliveryCount;
+ }
+
+ public void setLinkCredit(final UnsignedInteger linkCredit)
+ {
+ _linkCredit = linkCredit;
+ }
+
+ public void setAvailable(final UnsignedInteger available)
+ {
+ _available = available;
+ }
+
+ public void setDrain(final Boolean drain)
+ {
+ _drain = drain;
+ }
+
+ public UnsignedInteger getDeliveryCount()
+ {
+ return _deliveryCount;
+ }
+
+ public UnsignedInteger getAvailable()
+ {
+ return _available;
+ }
+
+ public Boolean getDrain()
+ {
+ return _drain;
+ }
+
+ public UnsignedInteger getLinkCredit()
+ {
+ return _linkCredit;
+ }
+
+ public void remoteDetached(Detach detach)
+ {
+ synchronized (getLock())
+ {
+ switch(_state)
+ {
+ case DETACH_SENT:
+ _state = State.DETACHED;
+ break;
+ case ATTACHED:
+ _state = State.DETACH_RECVD;
+ _linkEventListener.remoteDetached(this, detach);
+ break;
+ }
+ getLock().notifyAll();
+ }
+ }
+
+ public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+ {
+ // TODO
+ }
+
+ public void settledByPeer(final Binary deliveryTag)
+ {
+ // TODO
+ }
+
+ public void receiveFlow(final Flow flow)
+ {
+ }
+
+ public void addUnsettled(final Delivery unsettled)
+ {
+ synchronized(getLock())
+ {
+ _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled);
+ getLock().notifyAll();
+ }
+ }
+
+ public void receiveDeliveryState(final Delivery unsettled,
+ final DeliveryState state,
+ final Boolean settled)
+ {
+ // TODO
+ synchronized(getLock())
+ {
+ if(_deliveryStateHandler != null)
+ {
+ _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled);
+ }
+
+ if(settled)
+ {
+ settle(unsettled.getDeliveryTag());
+ }
+
+ getLock().notifyAll();
+ }
+
+ }
+
+ public void settle(final Binary deliveryTag)
+ {
+ Delivery delivery = _unsettledTransfers.remove(deliveryTag);
+ if(delivery != null)
+ {
+ getSession().settle(getRole(),delivery.getDeliveryId());
+ }
+
+ }
+
+ public int getUnsettledCount()
+ {
+ synchronized(getLock())
+ {
+ return _unsettledTransfers.size();
+ }
+ }
+
+ public void setLocalHandle(final UnsignedInteger localHandle)
+ {
+ _localHandle = localHandle;
+ }
+
+ public void receiveAttach(final Attach attach)
+ {
+ synchronized(getLock())
+ {
+ switch(_state)
+ {
+ case ATTACH_SENT:
+ {
+
+ _state = State.ATTACHED;
+ getLock().notifyAll();
+
+ _initialUnsettledMap = attach.getUnsettled();
+ /* TODO - don't yet handle:
+
+ attach.getUnsettled();
+ attach.getProperties();
+ attach.getDurable();
+ attach.getExpiryPolicy();
+ attach.getTimeout();
+ */
+
+ break;
+ }
+
+ case DETACHED:
+ {
+ _state = State.ATTACHED;
+ getLock().notifyAll();
+ }
+
+
+ }
+
+ if(attach.getRole() == Role.SENDER)
+ {
+ _source = attach.getSource();
+ }
+ else
+ {
+ _target = attach.getTarget();
+ }
+
+ if(getRole() == Role.SENDER)
+ {
+ _maxMessageSize = attach.getMaxMessageSize();
+ }
+
+ }
+ }
+
+ public synchronized boolean isAttached()
+ {
+ return _state == State.ATTACHED;
+ }
+
+ public synchronized boolean isDetached()
+ {
+ return _state == State.DETACHED;
+ }
+
+ public SessionEndpoint getSession()
+ {
+ return _session;
+ }
+
+ public UnsignedInteger getLocalHandle()
+ {
+ return _localHandle;
+ }
+
+ public Object getLock()
+ {
+ return _session.getLock();
+ }
+
+ public void attach()
+ {
+ synchronized(getLock())
+ {
+ Attach attachToSend = new Attach();
+ attachToSend.setName(getName());
+ attachToSend.setRole(getRole());
+ attachToSend.setHandle(getLocalHandle());
+ attachToSend.setSource(getSource());
+ attachToSend.setTarget(getTarget());
+ attachToSend.setSndSettleMode(getSendingSettlementMode());
+ attachToSend.setRcvSettleMode(getReceivingSettlementMode());
+ attachToSend.setUnsettled(_localUnsettled);
+
+ if(getRole() == Role.SENDER)
+ {
+ attachToSend.setInitialDeliveryCount(_deliveryCount);
+ }
+
+ switch(_state)
+ {
+ case DETACHED:
+ _state = State.ATTACH_SENT;
+ break;
+ case ATTACH_RECVD:
+ _state = State.ATTACHED;
+ break;
+ default:
+ // TODO ERROR
+ }
+
+ getSession().sendAttach(attachToSend);
+
+ getLock().notifyAll();
+
+ }
+
+ }
+
+
+ public void detach()
+ {
+ detach(null, false);
+ }
+
+ public void close()
+ {
+ detach(null, true);
+ }
+
+ public void close(Error error)
+ {
+ detach(error, true);
+ }
+
+ public void detach(Error error)
+ {
+ detach(error, false);
+ }
+
+ private void detach(Error error, boolean close)
+ {
+ synchronized(getLock())
+ {
+ //TODO
+ switch(_state)
+ {
+ case ATTACHED:
+ _state = State.DETACH_SENT;
+ break;
+ case DETACH_RECVD:
+ _state = State.DETACHED;
+ break;
+ default:
+ return;
+ }
+
+ Detach detach = new Detach();
+ detach.setHandle(getLocalHandle());
+ if(close)
+ detach.setClosed(close);
+ detach.setError(error);
+
+ getSession().sendDetach(detach);
+
+ getLock().notifyAll();
+ }
+
+ }
+
+
+
+
+ public void setTransactionId(final Object txnId)
+ {
+ _flowTransactionId = txnId;
+ }
+
+ public void sendFlowConditional()
+ {
+ if(_lastSentCreditLimit != null)
+ {
+ UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+ int i = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit);
+ if(i >=0)
+ {
+ sendFlow(_flowTransactionId != null);
+ }
+ else
+ {
+ getSession().sendFlowConditional();
+ }
+ }
+ else
+ {
+ sendFlow(_flowTransactionId != null);
+ }
+ }
+
+
+ public void sendFlow()
+ {
+ sendFlow(_flowTransactionId != null);
+ }
+
+ public void sendFlow(boolean setTransactionId)
+ {
+ if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
+ {
+ Flow flow = new Flow();
+ flow.setLinkCredit(_linkCredit);
+ flow.setDeliveryCount(_deliveryCount);
+ _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+ flow.setAvailable(_available);
+ flow.setDrain(_drain);
+ if(setTransactionId)
+ {
+ flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));
+ }
+ flow.setHandle(getLocalHandle());
+ getSession().sendFlow(flow);
+ }
+ }
+
+ public T getLinkEventListener()
+ {
+ return _linkEventListener;
+ }
+
+ public void setLinkEventListener(final T linkEventListener)
+ {
+ synchronized(getLock())
+ {
+ _linkEventListener = linkEventListener;
+ }
+ }
+
+ public DeliveryStateHandler getDeliveryStateHandler()
+ {
+ return _deliveryStateHandler;
+ }
+
+ public void setDeliveryStateHandler(final DeliveryStateHandler deliveryStateHandler)
+ {
+ synchronized(getLock())
+ {
+ _deliveryStateHandler = deliveryStateHandler;
+ }
+ }
+
+ public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode)
+ {
+ _sendingSettlementMode = sendingSettlementMode;
+ }
+
+ public SenderSettleMode getSendingSettlementMode()
+ {
+ return _sendingSettlementMode;
+ }
+
+ public ReceiverSettleMode getReceivingSettlementMode()
+ {
+ return _receivingSettlementMode;
+ }
+
+ public void setReceivingSettlementMode(ReceiverSettleMode receivingSettlementMode)
+ {
+ _receivingSettlementMode = receivingSettlementMode;
+ }
+
+ public Map getInitialUnsettledMap()
+ {
+ return _initialUnsettledMap;
+ }
+
+ public void setLocalUnsettled(Map unsettled)
+ {
+ _localUnsettled = unsettled;
+ }
+
+ @Override public String toString()
+ {
+ return "LinkEndpoint{" +
+ "_name='" + _name + '\'' +
+ ", _session=" + _session +
+ ", _state=" + _state +
+ ", _role=" + getRole() +
+ ", _source=" + _source +
+ ", _target=" + _target +
+ ", _transferCount=" + _deliveryCount +
+ ", _linkCredit=" + _linkCredit +
+ ", _available=" + _available +
+ ", _drain=" + _drain +
+ ", _localHandle=" + _localHandle +
+ ", _maxMessageSize=" + _maxMessageSize +
+ '}';
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.Detach;
+
+public interface LinkEventListener
+{
+
+
+ void remoteDetached(final LinkEndpoint endpoint, Detach detach);
+
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+public class Node
+{
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ProtocolHeaderTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+
+import org.apache.qpid.amqp_1_0.type.Binary;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ProtocolHeaderTransport implements BytesTransport
+{
+ private final Object _inputLock = new Object();
+ private final Object _outputLock = new Object();
+
+ private volatile boolean _inputOpen;
+ private volatile boolean _outputOpen;
+
+ private final Map<Binary,BytesTransport> _headersMap;
+
+ private BytesTransport _delegate;
+ private ByteBuffer _bytesToSend;
+
+ private byte[] _header = new byte[8];
+ private int _received;
+ private ByteBuffer _outputBuffer;
+
+ public ProtocolHeaderTransport(Map<Binary, BytesTransport> validHeaders)
+ {
+ _headersMap = validHeaders;
+
+ // if only one valid header then we can send, else we have to wait
+ }
+
+ public boolean isOpenForInput()
+ {
+ return _inputOpen;
+ }
+
+ public void inputClosed()
+ {
+ synchronized(_inputLock)
+ {
+ _inputOpen = false;
+ _inputLock.notifyAll();
+ }
+ }
+
+ public void processBytes(final ByteBuffer buf)
+ {
+ if(_delegate != null)
+ {
+ _delegate.processBytes(buf);
+ }
+ else
+ {
+
+ while( _received < 8 && buf.hasRemaining())
+ {
+ _header[_received++] = buf.get();
+ }
+ if(_received == 8)
+ {
+ Binary header = new Binary(_header);
+ _delegate = _headersMap.get(header);
+ if(_delegate != null)
+ {
+ _delegate.processBytes(ByteBuffer.wrap(_header));
+ _delegate.processBytes(buf);
+ }
+ else
+ {
+ inputClosed();
+ _outputBuffer = _headersMap.keySet().iterator().next().asByteBuffer();
+ }
+ }
+ }
+ }
+
+ public void setInputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+
+ public void getNextBytes(final BytesProcessor processor)
+ {
+ if(_bytesToSend != null && _bytesToSend.hasRemaining())
+ {
+ processor.processBytes(_bytesToSend);
+ }
+ else if(_delegate != null)
+ {
+ _delegate.getNextBytes(processor);
+ }
+
+ }
+
+ public void outputClosed()
+ {
+ synchronized (_outputLock)
+ {
+ _outputOpen = false;
+ _outputLock.notifyAll();
+ }
+ }
+
+ public boolean isOpenForOutput()
+ {
+ return _outputOpen;
+ }
+
+ public void setOutputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+
+import java.util.*;
+
+public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener>
+{
+
+
+ private static class TransientState
+ {
+
+ UnsignedInteger _deliveryId;
+ int _credit = 1;
+ boolean _settled;
+
+ private TransientState(final UnsignedInteger transferId)
+ {
+ _deliveryId = transferId;
+ }
+
+ void incrementCredit()
+ {
+ _credit++;
+ }
+
+ public int getCredit()
+ {
+ return _credit;
+ }
+
+ public UnsignedInteger getDeliveryId()
+ {
+ return _deliveryId;
+ }
+
+ public boolean isSettled()
+ {
+ return _settled;
+ }
+
+ public void setSettled(boolean settled)
+ {
+ _settled = settled;
+ }
+ }
+
+ private Map<Binary, Object> _unsettledMap = new LinkedHashMap<Binary, Object>();
+ private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>();
+ private boolean _creditWindow;
+ private boolean _remoteDrain;
+ private UnsignedInteger _remoteTransferCount;
+ private UnsignedInteger _drainLimit;
+
+
+ public ReceivingLinkEndpoint(final SessionEndpoint session, String name)
+ {
+ this(session,name,null);
+ }
+
+ public ReceivingLinkEndpoint(final SessionEndpoint session, String name, Map<Binary, Outcome> unsettledMap)
+ {
+ super(session, name, unsettledMap);
+ setDeliveryCount(UnsignedInteger.valueOf(0));
+ setLinkEventListener(ReceivingLinkListener.DEFAULT);
+ }
+
+ public ReceivingLinkEndpoint(final SessionEndpoint session, final Attach attach)
+ {
+ super(session, attach);
+ setDeliveryCount(attach.getInitialDeliveryCount());
+ setLinkEventListener(ReceivingLinkListener.DEFAULT);
+ setSendingSettlementMode(attach.getSndSettleMode());
+ setReceivingSettlementMode(attach.getRcvSettleMode());
+ }
+
+
+ @Override public Role getRole()
+ {
+ return Role.RECEIVER;
+ }
+
+ @Override
+ public void receiveTransfer(final Transfer transfer, final Delivery delivery)
+ {
+ synchronized (getLock())
+ {
+ TransientState transientState;
+ boolean existingState = _unsettledMap.containsKey(transfer.getDeliveryTag());
+ _unsettledMap.put(transfer.getDeliveryTag(), transfer.getState());
+ if(!existingState)
+ {
+ transientState = new TransientState(transfer.getDeliveryId());
+ if(Boolean.TRUE.equals(transfer.getSettled()))
+ {
+ transientState.setSettled(true);
+ }
+ _unsettledIds.put(transfer.getDeliveryTag(), transientState);
+ setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
+ setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
+
+ }
+ else
+ {
+ transientState = _unsettledIds.get(transfer.getDeliveryTag());
+ transientState.incrementCredit();
+ if(Boolean.TRUE.equals(transfer.getSettled()))
+ {
+ transientState.setSettled(true);
+ }
+ }
+
+ if(transientState.isSettled())
+ {
+ _unsettledMap.remove(transfer.getDeliveryTag());
+ }
+ getLinkEventListener().messageTransfer(transfer);
+
+
+ getLock().notifyAll();
+ }
+ }
+
+ @Override public void receiveFlow(final Flow flow)
+ {
+ synchronized (getLock())
+ {
+ super.receiveFlow(flow);
+ _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain());
+ setAvailable(flow.getAvailable());
+ _remoteTransferCount = flow.getDeliveryCount();
+ getLock().notifyAll();
+ }
+ }
+
+
+ public boolean isDrained()
+ {
+ return getDrain() && getDeliveryCount().equals(getDrainLimit());
+ }
+
+ @Override
+ public void settledByPeer(final Binary deliveryTag)
+ {
+ synchronized (getLock())
+ {
+ // TODO XXX : need to do anything about the window here?
+ if(settled(deliveryTag) && _creditWindow)
+ {
+ sendFlowConditional();
+ }
+ }
+ }
+
+ public boolean settled(final Binary deliveryTag)
+ {
+ synchronized(getLock())
+ {
+ boolean deleted;
+ if(deleted = (_unsettledIds.remove(deliveryTag) != null))
+ {
+ _unsettledMap.remove(deliveryTag);
+
+ getLock().notifyAll();
+ }
+
+ return deleted;
+ }
+ }
+
+ public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+ {
+ synchronized(getLock())
+ {
+ if(_unsettledMap.containsKey(deliveryTag))
+ {
+ boolean outcomeUpdate = false;
+ Outcome outcome=null;
+ if(state instanceof Outcome)
+ {
+ outcome = (Outcome)state;
+ }
+ else if(state instanceof TransactionalState)
+ {
+ // TODO? Is this correct
+ outcome = ((TransactionalState)state).getOutcome();
+ }
+
+ if(outcome != null)
+ {
+ Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
+ outcomeUpdate = !outcome.equals(oldOutcome);
+ }
+
+
+
+
+ TransientState transientState = _unsettledIds.get(deliveryTag);
+ if(outcomeUpdate || settled)
+ {
+
+ final UnsignedInteger transferId = transientState.getDeliveryId();
+
+ getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
+ }
+
+
+ if(settled)
+ {
+
+ if(settled(deliveryTag))
+ {
+ if(_creditWindow)
+ {
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
+ }
+ else
+ {
+ getSession().sendFlowConditional();
+ }
+ }
+ }
+ getLock().notifyAll();
+ }
+ else
+ {
+ TransientState transientState = _unsettledIds.get(deliveryTag);
+ if(_creditWindow)
+ {
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
+ }
+
+ }
+ }
+
+ }
+
+
+ public void setCreditWindow()
+ {
+ setCreditWindow(true);
+ }
+ public void setCreditWindow(boolean window)
+ {
+
+ _creditWindow = window;
+ sendFlowConditional();
+
+ }
+
+ public void drain()
+ {
+ synchronized (getLock())
+ {
+ setDrain(true);
+ _creditWindow = false;
+ _drainLimit = getDeliveryCount().add(getLinkCredit());
+ sendFlow();
+ getLock().notifyAll();
+ }
+ }
+
+ @Override
+ public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+ {
+ super.receiveDeliveryState(unsettled, state, settled);
+ if(_creditWindow)
+ {
+ if(Boolean.TRUE.equals(settled))
+ {
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
+ }
+ }
+ }
+
+ public void requestTransactionalSend(Object txnId)
+ {
+ synchronized (getLock())
+ {
+ setDrain(true);
+ _creditWindow = false;
+ setTransactionId(txnId);
+ sendFlow();
+ getLock().notifyAll();
+ }
+ }
+
+ private void sendFlow(final Object transactionId)
+ {
+ sendFlow();
+ }
+
+
+ public void clearDrain()
+ {
+ synchronized (getLock())
+ {
+ setDrain(false);
+ sendFlow();
+ getLock().notifyAll();
+ }
+ }
+
+ public void updateAllDisposition(Binary deliveryTag, Outcome outcome, boolean settled)
+ {
+ synchronized(getLock())
+ {
+ if(!_unsettledIds.isEmpty())
+ {
+ Binary firstTag = _unsettledIds.keySet().iterator().next();
+ Binary lastTag = deliveryTag;
+ updateDispositions(firstTag, lastTag, (DeliveryState) outcome, settled);
+ }
+ }
+ }
+
+ private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled)
+ {
+ SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger,UnsignedInteger>();
+
+ synchronized(getLock())
+ {
+
+ Iterator<Binary> iter = _unsettledIds.keySet().iterator();
+ List<Binary> tagsToUpdate = new ArrayList<Binary>();
+ Binary tag = null;
+
+ while(iter.hasNext() && !(tag = iter.next()).equals(firstTag));
+
+ if(firstTag.equals(tag))
+ {
+ tagsToUpdate.add(tag);
+
+ UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+
+ UnsignedInteger first = deliveryId;
+ UnsignedInteger last = first;
+
+ while(iter.hasNext())
+ {
+ tag = iter.next();
+ tagsToUpdate.add(tag);
+
+ deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
+
+ if(deliveryId.equals(last.add(UnsignedInteger.ONE)))
+ {
+ last = deliveryId;
+ }
+ else
+ {
+ ranges.put(first,last);
+ first = last = deliveryId;
+ }
+
+ if(tag.equals(lastTag))
+ {
+ break;
+ }
+ }
+
+ ranges.put(first,last);
+ }
+
+ if(settled)
+ {
+
+ for(Binary deliveryTag : tagsToUpdate)
+ {
+ if(settled(deliveryTag) && _creditWindow)
+ {
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1)));
+ }
+ }
+ sendFlowConditional();
+ }
+
+
+
+ for(Map.Entry<UnsignedInteger,UnsignedInteger> range : ranges.entrySet())
+ {
+ getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled);
+ }
+
+
+ getLock().notifyAll();
+ }
+
+ }
+
+ @Override
+ public void settle(Binary deliveryTag)
+ {
+ super.settle(deliveryTag);
+ if(_creditWindow)
+ {
+ sendFlowConditional();
+ }
+
+ }
+
+ public UnsignedInteger getDrainLimit()
+ {
+ return _drainLimit;
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public interface ReceivingLinkListener extends LinkEventListener
+{
+ void messageTransfer(Transfer xfr);
+
+ class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener
+ {
+ public void messageTransfer(final Transfer xfr)
+ {
+
+ }
+
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ endpoint.detach();
+ }
+ }
+
+ public static final org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener DEFAULT = new DefaultLinkEventListener();
+
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingSessionHalfEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class ReceivingSessionHalfEndpoint extends SessionHalfEndpoint
+{
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.security.SaslChallenge;
+import org.apache.qpid.amqp_1_0.type.security.SaslInit;
+import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms;
+import org.apache.qpid.amqp_1_0.type.security.SaslOutcome;
+import org.apache.qpid.amqp_1_0.type.security.SaslResponse;
+
+public interface SASLEndpoint
+{
+ void receiveSaslInit(SaslInit saslInit);
+
+ void receiveSaslMechanisms(SaslMechanisms saslMechanisms);
+
+ void receiveSaslChallenge(SaslChallenge saslChallenge);
+
+ void receiveSaslResponse(SaslResponse saslResponse);
+
+ void receiveSaslOutcome(SaslOutcome saslOutcome);
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpointImpl.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.amqp_1_0.codec.ValueWriter;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.security.SaslChallenge;
+import org.apache.qpid.amqp_1_0.type.security.SaslInit;
+import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms;
+import org.apache.qpid.amqp_1_0.type.security.SaslOutcome;
+import org.apache.qpid.amqp_1_0.type.security.SaslResponse;
+
+
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import java.io.PrintWriter;
+import java.util.Arrays;
+
+
+public class SASLEndpointImpl
+ implements DescribedTypeConstructorRegistry.Source, ValueWriter.Registry.Source, SASLEndpoint
+{
+ private static final short SASL_CONTROL_CHANNEL = (short) 0;
+
+ private static final byte[] EMPTY_CHALLENGE = new byte[0];
+
+ private FrameTransport _transport;
+
+
+ private static enum State
+ {
+ BEGIN_SERVER,
+ BEGIN_CLIENT,
+ SENT_MECHANISMS,
+ SENT_INIT,
+ SENT_REPSONSE,
+ SENT_CHALLENGE,
+ SENT_OUTCOME
+ };
+
+
+ public PrintWriter _out;
+
+
+ private State _state;
+
+ private SaslClient _saslClient;
+ private SaslServer _saslServer;
+
+ private boolean _isReadable;
+ private boolean _isWritable;
+ private boolean _closedForInput;
+ private boolean _closedForOutput;
+
+ private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance().registerSecurityLayer();
+
+ private FrameOutputHandler _frameOutputHandler;
+
+ private byte _majorVersion;
+ private byte _minorVersion;
+ private byte _revision;
+ private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+ private ConnectionEventListener _connectionEventListener = ConnectionEventListener.DEFAULT;
+ private Symbol[] _mechanisms;
+ private Symbol _mechanism;
+
+
+ private SASLEndpointImpl(FrameTransport transport, State initialState, Symbol... mechanisms)
+ {
+ _transport = transport;
+ _state = initialState;
+ _mechanisms = mechanisms;
+ }
+
+
+ public void setFrameOutputHandler(final FrameOutputHandler frameOutputHandler)
+ {
+ _frameOutputHandler = frameOutputHandler;
+ if(_state == State.BEGIN_SERVER)
+ {
+ sendMechanisms();
+ }
+ }
+
+ private synchronized void sendMechanisms()
+ {
+ SaslMechanisms saslMechanisms = new SaslMechanisms();
+
+ saslMechanisms.setSaslServerMechanisms(_mechanisms);
+
+ _state = State.SENT_MECHANISMS;
+
+ send(saslMechanisms);
+ }
+
+ public boolean isReadable()
+ {
+ return _isReadable;
+ }
+
+ public boolean isWritable()
+ {
+ return _isWritable;
+ }
+
+
+ public synchronized void send(SaslFrameBody body)
+ {
+ if(!_closedForOutput)
+ {
+ if(_out != null)
+ {
+ _out.println("SEND : " + body);
+ _out.flush();
+ }
+ //_frameOutputHandler.send(new SASLFrame(body));
+ }
+ }
+
+
+
+ public void invalidHeaderReceived()
+ {
+ // TODO
+ _closedForInput = true;
+ }
+
+ public synchronized boolean closedForInput()
+ {
+ return _closedForInput;
+ }
+
+ public synchronized void protocolHeaderReceived(final byte major, final byte minorVersion, final byte revision)
+ {
+ _majorVersion = major;
+ _minorVersion = minorVersion;
+ _revision = revision;
+ }
+
+
+ public synchronized void receive(final short channel, final Object frame)
+ {
+ if(_out != null)
+ {
+ _out.println( "RECV["+channel+"] : " + frame);
+ _out.flush();
+ }
+ if(frame instanceof SaslFrameBody)
+ {
+ ((SaslFrameBody)frame).invoke(this);
+ }
+ else
+ {
+ // TODO
+ }
+ }
+
+ public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
+ {
+ return _describedTypeRegistry;
+ }
+
+ public synchronized void setClosedForOutput(boolean b)
+ {
+ _closedForOutput = true;
+ notifyAll();
+ }
+
+ public synchronized boolean closedForOutput()
+ {
+ return _closedForOutput;
+ }
+
+
+ public Object getLock()
+ {
+ return this;
+ }
+
+
+ public byte getMajorVersion()
+ {
+ return _majorVersion;
+ }
+
+
+ public byte getMinorVersion()
+ {
+ return _minorVersion;
+ }
+
+ public byte getRevision()
+ {
+ return _revision;
+ }
+
+
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+ _mechanism = saslInit.getMechanism();
+ try
+ {
+ _saslServer = Sasl.createSaslServer(_mechanism.toString(), "AMQP", "localhost", null, createServerCallbackHandler(_mechanism));
+ }
+ catch (SaslException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ private CallbackHandler createServerCallbackHandler(final Symbol mechanism)
+ {
+ return null; //TODO
+ }
+
+ public synchronized void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+ Symbol[] serverMechanisms = saslMechanisms.getSaslServerMechanisms();
+ for(Symbol mechanism : _mechanisms)
+ {
+ if(Arrays.asList(serverMechanisms).contains(mechanism))
+ {
+ _mechanism = mechanism;
+ break;
+ }
+ }
+ // TODO - no matching mechanism
+ try
+ {
+ _saslClient = Sasl.createSaslClient(new String[] { _mechanism.toString() }, null, "AMQP", "localhost", null,
+ createClientCallbackHandler(_mechanism));
+ SaslInit init = new SaslInit();
+ init.setMechanism(_mechanism);
+ init.setInitialResponse(_saslClient.hasInitialResponse() ? new Binary(_saslClient.evaluateChallenge(EMPTY_CHALLENGE)) : null);
+ send(init);
+ }
+ catch (SaslException e)
+ {
+ e.printStackTrace(); //TODO
+ }
+ }
+
+ private CallbackHandler createClientCallbackHandler(final Symbol mechanism)
+ {
+ return null; //TODO
+ }
+
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+ //TODO
+ }
+
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+ //TODO
+ }
+
+
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+ //TODO
+ }
+
+
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLFrameTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.framing.SASLFrame;
+
+public class SASLFrameTransport implements FrameTransport<SASLFrame>
+{
+ private final Object _inputLock = new Object();
+ private final Object _outputLock = new Object();
+
+ private volatile boolean _inputOpen;
+ private volatile boolean _outputOpen;
+
+ public boolean isOpenForInput()
+ {
+ return _inputOpen;
+ }
+
+ public void closeForInput()
+ {
+ synchronized(_inputLock)
+ {
+ _inputOpen = false;
+ _inputLock.notifyAll();
+ }
+ }
+ public void processIncomingFrame(final SASLFrame frame)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public SASLFrame getNextFrame()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void closeForOutput()
+ {
+ synchronized (_outputLock)
+ {
+ _outputOpen = false;
+ _outputLock.notifyAll();
+ }
+ }
+
+ public boolean isOpenForOutput()
+ {
+ return _outputOpen;
+ }
+
+ public void setInputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setOutputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLTransport.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+
+import java.nio.ByteBuffer;
+
+public class SASLTransport implements BytesTransport
+{
+ private final Object _inputLock = new Object();
+ private final Object _outputLock = new Object();
+
+ private volatile boolean _inputOpen;
+ private volatile boolean _outputOpen;
+
+ private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
+ .registerSecurityLayer();
+
+ public boolean isOpenForInput()
+ {
+ return _inputOpen;
+ }
+
+ public void inputClosed()
+ {
+ synchronized(_inputLock)
+ {
+ _inputOpen = false;
+ _inputLock.notifyAll();
+ }
+ }
+
+ public void processBytes(final ByteBuffer buf)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setInputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void getNextBytes(final BytesProcessor processor)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void outputClosed()
+ {
+ synchronized (_outputLock)
+ {
+ _outputOpen = false;
+ _outputLock.notifyAll();
+ }
+ }
+
+ public boolean isOpenForOutput()
+ {
+ return _outputOpen;
+ }
+
+ public void setOutputStateChangeListener(final StateChangeListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transport.Attach;
+import org.apache.qpid.amqp_1_0.type.transport.Flow;
+import org.apache.qpid.amqp_1_0.type.transport.Role;
+import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener>
+{
+
+ private UnsignedInteger _lastDeliveryId;
+ private Binary _lastDeliveryTag;
+ private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+ private Binary _transactionId;
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name)
+ {
+ this(sessionEndpoint, name, null);
+ }
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, String name, Map<Binary, Outcome> unsettled)
+ {
+ super(sessionEndpoint, name, unsettled);
+ init();
+ }
+
+ public SendingLinkEndpoint(final SessionEndpoint sessionEndpoint, final Attach attach)
+ {
+ super(sessionEndpoint, attach);
+ setSendingSettlementMode(attach.getSndSettleMode());
+ setReceivingSettlementMode(attach.getRcvSettleMode());
+ init();
+ }
+
+ private void init()
+ {
+ setDeliveryCount(UnsignedInteger.valueOf(0));
+ setAvailable(UnsignedInteger.valueOf(0));
+ setLinkEventListener(SendingLinkListener.DEFAULT);
+ }
+
+ @Override public Role getRole()
+ {
+ return Role.SENDER;
+ }
+
+ public boolean transfer(final Transfer xfr)
+ {
+ SessionEndpoint s = getSession();
+ int transferCount;
+ transferCount = _lastDeliveryTag == null ? 1 : 1;
+ xfr.setMessageFormat(UnsignedInteger.ZERO);
+ synchronized(getLock())
+ {
+
+ final int currentCredit = getLinkCredit().intValue() - transferCount;
+
+ if(currentCredit < 0)
+ {
+ return false;
+ }
+ else
+ {
+ setLinkCredit(UnsignedInteger.valueOf((int)currentCredit));
+ }
+
+ setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + transferCount)));
+
+ xfr.setHandle(getLocalHandle());
+
+ s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
+
+ if(!Boolean.TRUE.equals(xfr.getSettled()))
+ {
+ _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
+ }
+ }
+
+ if(Boolean.TRUE.equals(xfr.getMore()))
+ {
+ _lastDeliveryTag = xfr.getDeliveryTag();
+ }
+ else
+ {
+ _lastDeliveryTag = null;
+ }
+
+ return true;
+ }
+
+
+ public void drained()
+ {
+ synchronized (getLock())
+ {
+ setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ setLinkCredit(UnsignedInteger.ZERO);
+ sendFlow();
+ }
+ }
+
+ @Override
+ public void receiveFlow(final Flow flow)
+ {
+ super.receiveFlow(flow); //To change body of overridden methods use File | Settings | File Templates.
+ UnsignedInteger t = flow.getDeliveryCount();
+ UnsignedInteger c = flow.getLinkCredit();
+ setDrain(flow.getDrain());
+
+ Map options;
+ if((options = flow.getProperties()) != null)
+ {
+ _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
+ }
+
+ if(t == null)
+ {
+ setLinkCredit(c);
+ }
+ else
+ {
+ UnsignedInteger limit = t.add(c);
+ if(limit.compareTo(getDeliveryCount())<=0)
+ {
+ setLinkCredit(UnsignedInteger.valueOf(0));
+ }
+ else
+ {
+ setLinkCredit(limit.subtract(getDeliveryCount()));
+ }
+ }
+ getLinkEventListener().flowStateChanged();
+
+ }
+
+ public boolean hasCreditToSend()
+ {
+ UnsignedInteger linkCredit = getLinkCredit();
+ return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+ && getSession().hasCreditToSend();
+ }
+
+ public void receiveDeliveryState(final Delivery unsettled,
+ final DeliveryState state,
+ final Boolean settled)
+ {
+ super.receiveDeliveryState(unsettled, state, settled);
+ if(settled)
+ {
+ _unsettledMap.remove(unsettled.getDeliveryTag());
+ }
+ }
+
+ public UnsignedInteger getLastDeliveryId()
+ {
+ return _lastDeliveryId;
+ }
+
+ public void setLastDeliveryId(final UnsignedInteger deliveryId)
+ {
+ _lastDeliveryId = deliveryId;
+ }
+
+ public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+ {
+ synchronized(getLock())
+ {
+ UnsignedInteger deliveryId;
+ if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+ {
+ settle(deliveryTag);
+ getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+ }
+
+ }
+ }
+
+ public Binary getTransactionId()
+ {
+ return _transactionId;
+ }
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.transport;
+
+import org.apache.qpid.amqp_1_0.type.transport.*;
+
+public interface SendingLinkListener extends LinkEventListener
+{
+ void flowStateChanged();
+
+ class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.SendingLinkListener
+ {
+
+ public void remoteDetached(final LinkEndpoint endpoint, final org.apache.qpid.amqp_1_0.type.transport.Detach detach)
+ {
+ endpoint.detach();
+ }
+
+ public void flowStateChanged()
+ {
+
+ }
+ }
+
+ public static final org.apache.qpid.amqp_1_0.transport.SendingLinkListener DEFAULT = new DefaultLinkEventListener();
+
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingSessionHalfEndpoint.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SendingSessionHalfEndpoint extends SessionHalfEndpoint
+{
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+{
+ private int _seqNo;
+
+ public SequenceNumber(int seqNo)
+ {
+ _seqNo = seqNo;
+ }
+
+ public SequenceNumber incr()
+ {
+ _seqNo++;
+ return this;
+ }
+
+ public SequenceNumber decr()
+ {
+ _seqNo--;
+ return this;
+ }
+
+ public static SequenceNumber add(SequenceNumber a, int i)
+ {
+ return a.clone().add(i);
+ }
+
+ public static SequenceNumber subtract(SequenceNumber a, int i)
+ {
+ return a.clone().add(-i);
+ }
+
+ private SequenceNumber add(int i)
+ {
+ _seqNo+=i;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ SequenceNumber that = (SequenceNumber) o;
+
+ if (_seqNo != that._seqNo)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _seqNo;
+ }
+
+ public int compareTo(SequenceNumber o)
+ {
+ return _seqNo - o._seqNo;
+ }
+
+ @Override
+ public SequenceNumber clone()
+ {
+ return new SequenceNumber(_seqNo);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SN{" + _seqNo + '}';
+ }
+
+ public int intValue()
+ {
+ return _seqNo;
+ }
+}
Added: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java?rev=1157566&view=auto
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java (added)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionAttachment.java Sun Aug 14 17:14:51 2011
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.amqp_1_0.transport;
+
+public class SessionAttachment
+{
+ private final SessionEndpoint _sessionEndpoint;
+ private final ConnectionEndpoint _connectionEndpoint;
+ private final short _channel;
+
+ public SessionAttachment(SessionEndpoint sessionEndpoint, ConnectionEndpoint connectionEndpoint, short channel)
+ {
+ _sessionEndpoint = sessionEndpoint;
+ _connectionEndpoint = connectionEndpoint;
+ _channel = channel;
+ }
+
+ public SessionEndpoint getSessionEndpoint()
+ {
+ return _sessionEndpoint;
+ }
+
+ public ConnectionEndpoint getConnectionEndpoint()
+ {
+ return _connectionEndpoint;
+ }
+
+ public short getChannel()
+ {
+ return _channel;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ SessionAttachment that = (SessionAttachment) o;
+
+ if (_channel != that._channel)
+ {
+ return false;
+ }
+ if (_connectionEndpoint != null
+ ? !_connectionEndpoint.equals(that._connectionEndpoint)
+ : that._connectionEndpoint != null)
+ {
+ return false;
+ }
+ if (_sessionEndpoint != null ? !_sessionEndpoint.equals(that._sessionEndpoint) : that._sessionEndpoint != null)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _sessionEndpoint != null ? _sessionEndpoint.hashCode() : 0;
+ result = 31 * result + (_connectionEndpoint != null ? _connectionEndpoint.hashCode() : 0);
+ result = 31 * result + (int) _channel;
+ return result;
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org