You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/09 15:07:25 UTC
[12/30] qpid-proton git commit: PROTON-1385: remove proton-j from the
existing repo, it now has its own repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
deleted file mode 100644
index c7b796d..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ /dev/null
@@ -1,365 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.ProtonJSession;
-import org.apache.qpid.proton.engine.Session;
-
-public class SessionImpl extends EndpointImpl implements ProtonJSession
-{
- private final ConnectionImpl _connection;
-
- private Map<String, SenderImpl> _senders = new LinkedHashMap<String, SenderImpl>();
- private Map<String, ReceiverImpl> _receivers = new LinkedHashMap<String, ReceiverImpl>();
- private List<LinkImpl> _oldLinksToFree = new ArrayList<LinkImpl>();
- private TransportSession _transportSession;
- private int _incomingCapacity = 1024*1024;
- private int _incomingBytes = 0;
- private int _outgoingBytes = 0;
- private int _incomingDeliveries = 0;
- private int _outgoingDeliveries = 0;
- private long _outgoingWindow = Integer.MAX_VALUE;
- private Map<Symbol, Object> _properties;
- private Map<Symbol, Object> _remoteProperties;
- private Symbol[] _offeredCapabilities;
- private Symbol[] _remoteOfferedCapabilities;
- private Symbol[] _desiredCapabilities;
- private Symbol[] _remoteDesiredCapabilities;
-
- private LinkNode<SessionImpl> _node;
-
-
- SessionImpl(ConnectionImpl connection)
- {
- _connection = connection;
- _connection.incref();
- _node = _connection.addSessionEndpoint(this);
- _connection.put(Event.Type.SESSION_INIT, this);
- }
-
- @Override
- public SenderImpl sender(String name)
- {
- SenderImpl sender = _senders.get(name);
- if(sender == null)
- {
- sender = new SenderImpl(this, name);
- _senders.put(name, sender);
- }
- else
- {
- if(sender.getLocalState() == EndpointState.CLOSED
- && sender.getRemoteState() == EndpointState.CLOSED)
- {
- _oldLinksToFree.add(sender);
-
- sender = new SenderImpl(this, name);
- _senders.put(name, sender);
- }
- }
- return sender;
- }
-
- @Override
- public ReceiverImpl receiver(String name)
- {
- ReceiverImpl receiver = _receivers.get(name);
- if(receiver == null)
- {
- receiver = new ReceiverImpl(this, name);
- _receivers.put(name, receiver);
- }
- else
- {
- if(receiver.getLocalState() == EndpointState.CLOSED
- && receiver.getRemoteState() == EndpointState.CLOSED)
- {
- _oldLinksToFree.add(receiver);
-
- receiver = new ReceiverImpl(this, name);
- _receivers.put(name, receiver);
- }
- }
- return receiver;
- }
-
- @Override
- public Session next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- LinkNode.Query<SessionImpl> query = new EndpointImplQuery<SessionImpl>(local, remote);
-
- LinkNode<SessionImpl> sessionNode = _node.next(query);
-
- return sessionNode == null ? null : sessionNode.getValue();
- }
-
- @Override
- protected ConnectionImpl getConnectionImpl()
- {
- return _connection;
- }
-
- @Override
- public ConnectionImpl getConnection()
- {
- return getConnectionImpl();
- }
-
- @Override
- void postFinal() {
- _connection.put(Event.Type.SESSION_FINAL, this);
- _connection.decref();
- }
-
- @Override
- void doFree() {
- _connection.freeSession(this);
- _connection.removeSessionEndpoint(_node);
- _node = null;
-
- List<SenderImpl> senders = new ArrayList<SenderImpl>(_senders.values());
- for(SenderImpl sender : senders) {
- sender.free();
- }
- _senders.clear();
-
- List<ReceiverImpl> receivers = new ArrayList<ReceiverImpl>(_receivers.values());
- for(ReceiverImpl receiver : receivers) {
- receiver.free();
- }
- _receivers.clear();
-
- List<LinkImpl> links = new ArrayList<LinkImpl>(_oldLinksToFree);
- for(LinkImpl link : links) {
- link.free();
- }
- }
-
- void modifyEndpoints() {
- for (SenderImpl snd : _senders.values()) {
- snd.modifyEndpoints();
- }
-
- for (ReceiverImpl rcv : _receivers.values()) {
- rcv.modifyEndpoints();
- }
- modified();
- }
-
- TransportSession getTransportSession()
- {
- return _transportSession;
- }
-
- void setTransportSession(TransportSession transportSession)
- {
- _transportSession = transportSession;
- }
-
- void setNode(LinkNode<SessionImpl> node)
- {
- _node = node;
- }
-
- void freeSender(SenderImpl sender)
- {
- String name = sender.getName();
- SenderImpl existing = _senders.get(name);
- if (sender.equals(existing))
- {
- _senders.remove(name);
- }
- else
- {
- _oldLinksToFree.remove(sender);
- }
- }
-
- void freeReceiver(ReceiverImpl receiver)
- {
- String name = receiver.getName();
- ReceiverImpl existing = _receivers.get(name);
- if (receiver.equals(existing))
- {
- _receivers.remove(name);
- }
- else
- {
- _oldLinksToFree.remove(receiver);
- }
- }
-
- @Override
- public int getIncomingCapacity()
- {
- return _incomingCapacity;
- }
-
- @Override
- public void setIncomingCapacity(int capacity)
- {
- _incomingCapacity = capacity;
- }
-
- @Override
- public int getIncomingBytes()
- {
- return _incomingBytes;
- }
-
- void incrementIncomingBytes(int delta)
- {
- _incomingBytes += delta;
- }
-
- @Override
- public int getOutgoingBytes()
- {
- return _outgoingBytes;
- }
-
- void incrementOutgoingBytes(int delta)
- {
- _outgoingBytes += delta;
- }
-
- void incrementIncomingDeliveries(int delta)
- {
- _incomingDeliveries += delta;
- }
-
- int getOutgoingDeliveries()
- {
- return _outgoingDeliveries;
- }
-
- void incrementOutgoingDeliveries(int delta)
- {
- _outgoingDeliveries += delta;
- }
-
- @Override
- void localOpen()
- {
- getConnectionImpl().put(Event.Type.SESSION_LOCAL_OPEN, this);
- }
-
- @Override
- void localClose()
- {
- getConnectionImpl().put(Event.Type.SESSION_LOCAL_CLOSE, this);
- }
-
- @Override
- public void setOutgoingWindow(long outgoingWindow) {
- if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL)
- {
- throw new IllegalArgumentException("Value '" + outgoingWindow + "' must be in the"
- + " range [0 - 2^32-1]");
- }
-
- _outgoingWindow = outgoingWindow;
- }
-
- @Override
- public long getOutgoingWindow()
- {
- return _outgoingWindow;
- }
-
- @Override
- public Map<Symbol, Object> getProperties()
- {
- return _properties;
- }
-
- @Override
- public void setProperties(Map<Symbol, Object> properties)
- {
- _properties = properties;
- }
-
- @Override
- public Map<Symbol, Object> getRemoteProperties()
- {
- return _remoteProperties;
- }
-
- void setRemoteProperties(Map<Symbol, Object> remoteProperties)
- {
- _remoteProperties = remoteProperties;
- }
-
- @Override
- public Symbol[] getDesiredCapabilities()
- {
- return _desiredCapabilities;
- }
-
- @Override
- public void setDesiredCapabilities(Symbol[] desiredCapabilities)
- {
- _desiredCapabilities = desiredCapabilities;
- }
-
- @Override
- public Symbol[] getRemoteDesiredCapabilities()
- {
- return _remoteDesiredCapabilities;
- }
-
- void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
- {
- _remoteDesiredCapabilities = remoteDesiredCapabilities;
- }
-
- @Override
- public Symbol[] getOfferedCapabilities()
- {
- return _offeredCapabilities;
- }
-
- @Override
- public void setOfferedCapabilities(Symbol[] offeredCapabilities)
- {
- _offeredCapabilities = offeredCapabilities;
- }
-
- @Override
- public Symbol[] getRemoteOfferedCapabilities()
- {
- return _remoteOfferedCapabilities;
- }
-
- void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
- {
- _remoteOfferedCapabilities = remoteOfferedCapabilities;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/StringUtils.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/StringUtils.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/StringUtils.java
deleted file mode 100644
index f80cca3..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/StringUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.Binary;
-
-public class StringUtils
-{
- /**
- * Converts the Binary to a quoted string.
- *
- * @param bin the Binary to convert
- * @param stringLength the maximum length of stringified content (excluding the quotes, and truncated indicator)
- * @param appendIfTruncated appends "...(truncated)" if not all of the payload is present in the string
- * @return the converted string
- */
- public static String toQuotedString(final Binary bin,final int stringLength,final boolean appendIfTruncated)
- {
- if(bin == null)
- {
- return "\"\"";
- }
-
- final byte[] binData = bin.getArray();
- final int binLength = bin.getLength();
- final int offset = bin.getArrayOffset();
-
- StringBuilder str = new StringBuilder();
- str.append("\"");
-
- int size = 0;
- boolean truncated = false;
- for (int i = 0; i < binLength; i++)
- {
- byte c = binData[offset + i];
-
- if (c > 31 && c < 127 && c != '\\')
- {
- if (size + 1 <= stringLength)
- {
- size += 1;
- str.append((char) c);
- }
- else
- {
- truncated = true;
- break;
- }
- }
- else
- {
- if (size + 4 <= stringLength)
- {
- size += 4;
- str.append(String.format("\\x%02x", c));
- }
- else
- {
- truncated = true;
- break;
- }
- }
- }
-
- str.append("\"");
-
- if (truncated && appendIfTruncated)
- {
- str.append("...(truncated)");
- }
-
- return str.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
deleted file mode 100644
index 3e6cada..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportDelivery.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-
-public class TransportDelivery
-{
- private UnsignedInteger _deliveryId;
- private DeliveryImpl _delivery;
- private TransportLink _transportLink;
- private int _sessionSize = 1;
-
- TransportDelivery(UnsignedInteger currentDeliveryId, DeliveryImpl delivery, TransportLink transportLink)
- {
- _deliveryId = currentDeliveryId;
- _delivery = delivery;
- _transportLink = transportLink;
- }
-
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
- }
-
- public TransportLink getTransportLink()
- {
- return _transportLink;
- }
-
- void incrementSessionSize()
- {
- _sessionSize++;
- }
-
- int getSessionSize()
- {
- return _sessionSize;
- }
-
- void settled()
- {
- _transportLink.settled(this);
- _delivery.updateWork();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
deleted file mode 100644
index 42126b0..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ /dev/null
@@ -1,1732 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourArrayToBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.UnsignedShort;
-import org.apache.qpid.proton.amqp.transport.Attach;
-import org.apache.qpid.proton.amqp.transport.Begin;
-import org.apache.qpid.proton.amqp.transport.Close;
-import org.apache.qpid.proton.amqp.transport.ConnectionError;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.Detach;
-import org.apache.qpid.proton.amqp.transport.Disposition;
-import org.apache.qpid.proton.amqp.transport.End;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.amqp.transport.Flow;
-import org.apache.qpid.proton.amqp.transport.FrameBody;
-import org.apache.qpid.proton.amqp.transport.Open;
-import org.apache.qpid.proton.amqp.transport.Role;
-import org.apache.qpid.proton.amqp.transport.Transfer;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.ProtonJTransport;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Ssl;
-import org.apache.qpid.proton.engine.SslDomain;
-import org.apache.qpid.proton.engine.SslPeerDetails;
-import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.engine.TransportResult;
-import org.apache.qpid.proton.engine.TransportResultFactory;
-import org.apache.qpid.proton.engine.impl.ssl.SslImpl;
-import org.apache.qpid.proton.framing.TransportFrame;
-import org.apache.qpid.proton.reactor.Reactor;
-import org.apache.qpid.proton.reactor.Selectable;
-
-public class TransportImpl extends EndpointImpl
- implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>,
- FrameHandler, TransportOutputWriter, TransportInternal
-{
- static final int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024);
- private static final int CHANNEL_MAX_LIMIT = 65535;
-
- private static final boolean getBooleanEnv(String name)
- {
- String value = System.getenv(name);
- return "true".equalsIgnoreCase(value) ||
- "1".equals(value) ||
- "yes".equalsIgnoreCase(value);
- }
-
- private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
- private static final int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024);
-
- // trace levels
- private int _levels = (FRM_ENABLED ? TRACE_FRM : 0);
-
- private FrameParser _frameParser;
-
- private ConnectionImpl _connectionEndpoint;
-
- private boolean _isOpenSent;
- private boolean _isCloseSent;
-
- private boolean _headerWritten;
- private Map<Integer, TransportSession> _remoteSessions = new HashMap<Integer, TransportSession>();
- private Map<Integer, TransportSession> _localSessions = new HashMap<Integer, TransportSession>();
-
- private TransportInput _inputProcessor;
- private TransportOutput _outputProcessor;
-
- private DecoderImpl _decoder = new DecoderImpl();
- private EncoderImpl _encoder = new EncoderImpl(_decoder);
-
- private int _maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
- private int _remoteMaxFrameSize = 512;
- private int _channelMax = CHANNEL_MAX_LIMIT;
- private int _remoteChannelMax = CHANNEL_MAX_LIMIT;
-
- private final FrameWriter _frameWriter;
-
- private boolean _closeReceived;
- private Open _open;
- private SaslImpl _sasl;
- private SslImpl _ssl;
- private final Ref<ProtocolTracer> _protocolTracer = new Ref(null);
-
- private TransportResult _lastTransportResult = TransportResultFactory.ok();
-
- private boolean _init;
- private boolean _processingStarted;
- private boolean _emitFlowEventOnSend = true;
-
- private FrameHandler _frameHandler = this;
- private boolean _head_closed = false;
- private ErrorCondition _condition = null;
-
- private boolean postedHeadClosed = false;
- private boolean postedTailClosed = false;
- private boolean postedTransportError = false;
-
- private int _localIdleTimeout = 0;
- private int _remoteIdleTimeout = 0;
- private long _bytesInput = 0;
- private long _bytesOutput = 0;
- private long _localIdleDeadline = 0;
- private long _lastBytesInput = 0;
- private long _lastBytesOutput = 0;
- private long _remoteIdleDeadline = 0;
-
- private Selectable _selectable;
- private Reactor _reactor;
-
- private List<TransportLayer> _additionalTransportLayers;
-
- /**
- * @deprecated This constructor's visibility will be reduced to the default scope in a future release.
- * Client code outside this module should use {@link org.apache.qpid.proton.engine.Transport.Factory#create()} instead
- */
- @Deprecated public TransportImpl()
- {
- this(DEFAULT_MAX_FRAME_SIZE);
- }
-
-
- /**
- * Creates a transport with the given maximum frame size.
- * Note that the maximumFrameSize also determines the size of the output buffer.
- */
- TransportImpl(int maxFrameSize)
- {
- AMQPDefinedTypes.registerAllTypes(_decoder, _encoder);
-
- _maxFrameSize = maxFrameSize;
- _frameWriter = new FrameWriter(_encoder, _remoteMaxFrameSize,
- FrameWriter.AMQP_FRAME_TYPE,
- _protocolTracer,
- this);
- }
-
- private void init()
- {
- if(!_init)
- {
- _init = true;
- _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize);
- _inputProcessor = _frameParser;
- _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize);
- }
- }
-
- @Override
- public void trace(int levels) {
- _levels = levels;
- }
-
- @Override
- public int getMaxFrameSize()
- {
- return _maxFrameSize;
- }
-
- @Override
- public int getRemoteMaxFrameSize()
- {
- return _remoteMaxFrameSize;
- }
-
- @Override
- public void setMaxFrameSize(int maxFrameSize)
- {
- if(_init)
- {
- throw new IllegalStateException("Cannot set max frame size after transport has been initialised");
- }
- _maxFrameSize = maxFrameSize;
- }
-
- @Override
- public int getChannelMax()
- {
- return _channelMax;
- }
-
- @Override
- public void setChannelMax(int channelMax)
- {
- if(_isOpenSent)
- {
- throw new IllegalArgumentException("Cannot change channel max after open frame has been sent");
- }
-
- if(channelMax < 0 || channelMax >= (1<<16))
- {
- throw new NumberFormatException("Value \""+channelMax+"\" lies outside the range [0-" + (1<<16) +").");
- }
-
- _channelMax = channelMax;
- }
-
- @Override
- public int getRemoteChannelMax()
- {
- return _remoteChannelMax;
- }
-
- @Override
- public ErrorCondition getCondition()
- {
- return _condition;
- }
-
- @Override
- public void bind(Connection conn)
- {
- // TODO - check if already bound
-
- _connectionEndpoint = (ConnectionImpl) conn;
- put(Event.Type.CONNECTION_BOUND, conn);
- _connectionEndpoint.setTransport(this);
- _connectionEndpoint.incref();
-
- if(getRemoteState() != EndpointState.UNINITIALIZED)
- {
- _connectionEndpoint.handleOpen(_open);
- if(getRemoteState() == EndpointState.CLOSED)
- {
- _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
- }
-
- _frameParser.flush();
- }
- }
-
- @Override
- public void unbind()
- {
- for (TransportSession ts: _localSessions.values()) {
- ts.unbind();
- }
- for (TransportSession ts: _remoteSessions.values()) {
- ts.unbind();
- }
-
- put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint);
-
- _connectionEndpoint.modifyEndpoints();
- _connectionEndpoint.setTransport(null);
- _connectionEndpoint.decref();
- }
-
- @Override
- public int input(byte[] bytes, int offset, int length)
- {
- oldApiCheckStateBeforeInput(length).checkIsOk();
-
- ByteBuffer inputBuffer = getInputBuffer();
- int numberOfBytesConsumed = pourArrayToBuffer(bytes, offset, length, inputBuffer);
- processInput().checkIsOk();
- return numberOfBytesConsumed;
- }
-
- /**
- * This method is public as it is used by Python layer.
- * @see org.apache.qpid.proton.engine.Transport#input(byte[], int, int)
- */
- public TransportResult oldApiCheckStateBeforeInput(int inputLength)
- {
- _lastTransportResult.checkIsOk();
- if(inputLength == 0)
- {
- if(_connectionEndpoint == null || _connectionEndpoint.getRemoteState() != EndpointState.CLOSED)
- {
- return TransportResultFactory.error(new TransportException("Unexpected EOS when remote connection not closed: connection aborted"));
- }
- }
- return TransportResultFactory.ok();
- }
-
- //==================================================================================================================
- // Process model state to generate output
-
- @Override
- public int output(byte[] bytes, final int offset, final int size)
- {
- ByteBuffer outputBuffer = getOutputBuffer();
- int numberOfBytesOutput = pourBufferToArray(outputBuffer, bytes, offset, size);
- outputConsumed();
- return numberOfBytesOutput;
- }
-
- @Override
- public boolean writeInto(ByteBuffer outputBuffer)
- {
- processHeader();
- processOpen();
- processBegin();
- processAttach();
- processReceiverFlow();
- // we process transport work twice intentionally, the first
- // pass may end up settling deliveries that the second pass
- // can clean up
- processTransportWork();
- processTransportWork();
- processSenderFlow();
- processDetach();
- processEnd();
- processClose();
-
- _frameWriter.readBytes(outputBuffer);
-
- return _isCloseSent || _head_closed;
- }
-
- @Override
- public Sasl sasl()
- {
- if(_sasl == null)
- {
- if(_processingStarted)
- {
- throw new IllegalStateException("Sasl can't be initiated after transport has started processing");
- }
-
- init();
- _sasl = new SaslImpl(this, _remoteMaxFrameSize);
- TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor);
- _inputProcessor = transportWrapper;
- _outputProcessor = transportWrapper;
- }
- return _sasl;
-
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>Note that sslDomain must implement {@link org.apache.qpid.proton.engine.impl.ssl.ProtonSslEngineProvider}.
- * This is not possible enforce at the API level because {@link org.apache.qpid.proton.engine.impl.ssl.ProtonSslEngineProvider} is not part of the
- * public Proton API.</p>
- */
- @Override
- public Ssl ssl(SslDomain sslDomain, SslPeerDetails sslPeerDetails)
- {
- if (_ssl == null)
- {
- init();
- _ssl = new SslImpl(sslDomain, sslPeerDetails);
- TransportWrapper transportWrapper = _ssl.wrap(_inputProcessor, _outputProcessor);
- _inputProcessor = transportWrapper;
- _outputProcessor = transportWrapper;
- }
- return _ssl;
- }
-
- @Override
- public Ssl ssl(SslDomain sslDomain)
- {
- return ssl(sslDomain, null);
- }
-
- private void processDetach()
- {
- if(_connectionEndpoint != null && _isOpenSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
-
- if(endpoint instanceof LinkImpl)
- {
- LinkImpl link = (LinkImpl) endpoint;
- TransportLink<?> transportLink = getTransportState(link);
- SessionImpl session = link.getSession();
- TransportSession transportSession = getTransportState(session);
-
- if(((link.getLocalState() == EndpointState.CLOSED) || link.detached())
- && transportLink.isLocalHandleSet()
- && transportSession.isLocalChannelSet()
- && !_isCloseSent)
- {
- if((link instanceof SenderImpl)
- && link.getQueued() > 0
- && !transportLink.detachReceived()
- && !transportSession.endReceived()
- && !_closeReceived) {
- endpoint = endpoint.transportNext();
- continue;
- }
-
- UnsignedInteger localHandle = transportLink.getLocalHandle();
- transportLink.clearLocalHandle();
- transportSession.freeLocalHandle(localHandle);
-
-
- Detach detach = new Detach();
- detach.setHandle(localHandle);
- detach.setClosed(!link.detached());
-
- ErrorCondition localError = link.getCondition();
- if( localError.getCondition() !=null )
- {
- detach.setError(localError);
- }
-
-
- writeFrame(transportSession.getLocalChannel(), detach, null, null);
- }
-
- endpoint.clearModified();
-
- }
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private void writeFlow(TransportSession ssn, TransportLink link)
- {
- Flow flow = new Flow();
- flow.setNextIncomingId(ssn.getNextIncomingId());
- flow.setNextOutgoingId(ssn.getNextOutgoingId());
- ssn.updateIncomingWindow();
- flow.setIncomingWindow(ssn.getIncomingWindowSize());
- flow.setOutgoingWindow(ssn.getOutgoingWindowSize());
- if (link != null) {
- flow.setHandle(link.getLocalHandle());
- flow.setDeliveryCount(link.getDeliveryCount());
- flow.setLinkCredit(link.getLinkCredit());
- flow.setDrain(link.getLink().getDrain());
- }
- writeFrame(ssn.getLocalChannel(), flow, null, null);
- }
-
- private void processSenderFlow()
- {
- if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
-
- if(endpoint instanceof SenderImpl)
- {
- SenderImpl sender = (SenderImpl) endpoint;
- if(sender.getDrain() && sender.getDrained() > 0)
- {
- TransportSender transportLink = sender.getTransportLink();
- TransportSession transportSession = sender.getSession().getTransportSession();
- UnsignedInteger credits = transportLink.getLinkCredit();
- transportLink.setLinkCredit(UnsignedInteger.ZERO);
- transportLink.setDeliveryCount(transportLink.getDeliveryCount().add(credits));
- sender.setDrained(0);
-
- writeFlow(transportSession, transportLink);
- }
-
- }
-
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private void processTransportWork()
- {
- if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
- {
- DeliveryImpl delivery = _connectionEndpoint.getTransportWorkHead();
- while(delivery != null)
- {
- LinkImpl link = delivery.getLink();
- if (link instanceof SenderImpl) {
- if (processTransportWorkSender(delivery, (SenderImpl) link)) {
- delivery = delivery.clearTransportWork();
- } else {
- delivery = delivery.getTransportWorkNext();
- }
- } else {
- if (processTransportWorkReceiver(delivery, (ReceiverImpl) link)) {
- delivery = delivery.clearTransportWork();
- } else {
- delivery = delivery.getTransportWorkNext();
- }
- }
- }
- }
- }
-
- private boolean processTransportWorkSender(DeliveryImpl delivery,
- SenderImpl snd)
- {
- TransportSender tpLink = snd.getTransportLink();
- SessionImpl session = snd.getSession();
- TransportSession tpSession = session.getTransportSession();
-
- boolean wasDone = delivery.isDone();
-
- if(!delivery.isDone() &&
- (delivery.getDataLength() > 0 || delivery != snd.current()) &&
- tpSession.hasOutgoingCredit() && tpLink.hasCredit() &&
- tpSession.isLocalChannelSet() &&
- tpLink.getLocalHandle() != null && !_frameWriter.isFull())
- {
- DeliveryImpl inProgress = tpLink.getInProgressDelivery();
- if(inProgress != null){
- // There is an existing Delivery awaiting completion. Check it
- // is the same Delivery object given and return if not, as we
- // can't interleave Transfer frames for deliveries on a link.
- if(inProgress != delivery) {
- return false;
- }
- }
-
- UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
- TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
- delivery.setTransportDelivery(tpDelivery);
-
- final Transfer transfer = new Transfer();
- transfer.setDeliveryId(deliveryId);
- transfer.setDeliveryTag(new Binary(delivery.getTag()));
- transfer.setHandle(tpLink.getLocalHandle());
-
- if(delivery.getLocalState() != null)
- {
- transfer.setState(delivery.getLocalState());
- }
-
- if(delivery.isSettled())
- {
- transfer.setSettled(Boolean.TRUE);
- }
- else
- {
- tpSession.addUnsettledOutgoing(deliveryId, delivery);
- }
-
- if(snd.current() == delivery)
- {
- transfer.setMore(true);
- }
-
- int messageFormat = delivery.getMessageFormat();
- if(messageFormat == DeliveryImpl.DEFAULT_MESSAGE_FORMAT) {
- transfer.setMessageFormat(UnsignedInteger.ZERO);
- } else {
- transfer.setMessageFormat(UnsignedInteger.valueOf(messageFormat));
- }
-
- ByteBuffer payload = delivery.getData() == null ? null :
- ByteBuffer.wrap(delivery.getData(), delivery.getDataOffset(),
- delivery.getDataLength());
-
- writeFrame(tpSession.getLocalChannel(), transfer, payload,
- new PartialTransfer(transfer));
- tpSession.incrementOutgoingId();
- tpSession.decrementRemoteIncomingWindow();
-
- if(payload == null || !payload.hasRemaining())
- {
- session.incrementOutgoingBytes(-delivery.pending());
- delivery.setData(null);
- delivery.setDataLength(0);
-
- if (!transfer.getMore()) {
- // Clear the in-progress delivery marker
- tpLink.setInProgressDelivery(null);
-
- delivery.setDone();
- tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
- tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
- tpSession.incrementOutgoingDeliveryId();
- session.incrementOutgoingDeliveries(-1);
- snd.decrementQueued();
- }
- }
- else
- {
- int delta = delivery.getDataLength() - payload.remaining();
- delivery.setDataOffset(delivery.getDataOffset() + delta);
- delivery.setDataLength(payload.remaining());
- session.incrementOutgoingBytes(-delta);
-
- // Remember the delivery we are still processing
- // the body transfer frames for
- tpLink.setInProgressDelivery(delivery);
- }
-
- if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
- getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
- }
- }
-
- if(wasDone && delivery.getLocalState() != null)
- {
- TransportDelivery tpDelivery = delivery.getTransportDelivery();
- Disposition disposition = new Disposition();
- disposition.setFirst(tpDelivery.getDeliveryId());
- disposition.setLast(tpDelivery.getDeliveryId());
- disposition.setRole(Role.SENDER);
- disposition.setSettled(delivery.isSettled());
- if(delivery.isSettled())
- {
- tpDelivery.settled();
- }
- disposition.setState(delivery.getLocalState());
-
- writeFrame(tpSession.getLocalChannel(), disposition, null,
- null);
- }
-
- return !delivery.isBuffered();
- }
-
- private boolean processTransportWorkReceiver(DeliveryImpl delivery,
- ReceiverImpl rcv)
- {
- TransportDelivery tpDelivery = delivery.getTransportDelivery();
- SessionImpl session = rcv.getSession();
- TransportSession tpSession = session.getTransportSession();
-
- if (tpSession.isLocalChannelSet())
- {
- boolean settled = delivery.isSettled();
- DeliveryState localState = delivery.getLocalState();
-
- Disposition disposition = new Disposition();
- disposition.setFirst(tpDelivery.getDeliveryId());
- disposition.setLast(tpDelivery.getDeliveryId());
- disposition.setRole(Role.RECEIVER);
- disposition.setSettled(settled);
- disposition.setState(localState);
-
- if(localState == null && settled) {
- disposition.setState(delivery.getDefaultDeliveryState());
- }
-
- writeFrame(tpSession.getLocalChannel(), disposition, null, null);
- if (settled)
- {
- tpDelivery.settled();
- }
- return true;
- }
-
- return false;
- }
-
- private void processReceiverFlow()
- {
- if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
- if(endpoint instanceof ReceiverImpl)
- {
- ReceiverImpl receiver = (ReceiverImpl) endpoint;
- TransportLink<?> transportLink = getTransportState(receiver);
- TransportSession transportSession = getTransportState(receiver.getSession());
-
- if(receiver.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet() && !receiver.detached())
- {
- int credits = receiver.clearUnsentCredits();
- if(credits != 0 || receiver.getDrain() ||
- transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
- {
- transportLink.addCredit(credits);
- writeFlow(transportSession, transportLink);
- }
- }
- }
- endpoint = endpoint.transportNext();
- }
- endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
- if(endpoint instanceof SessionImpl)
- {
-
- SessionImpl session = (SessionImpl) endpoint;
- TransportSession transportSession = getTransportState(session);
-
- if(session.getLocalState() == EndpointState.ACTIVE && transportSession.isLocalChannelSet())
- {
- if(transportSession.getIncomingWindowSize().equals(UnsignedInteger.ZERO))
- {
- writeFlow(transportSession, null);
- }
- }
- }
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private void processAttach()
- {
- if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
-
- while(endpoint != null)
- {
- if(endpoint instanceof LinkImpl)
- {
-
- LinkImpl link = (LinkImpl) endpoint;
- TransportLink<?> transportLink = getTransportState(link);
- SessionImpl session = link.getSession();
- TransportSession transportSession = getTransportState(session);
- if(link.getLocalState() != EndpointState.UNINITIALIZED && !transportLink.attachSent() && transportSession.isLocalChannelSet())
- {
-
- if( (link.getRemoteState() == EndpointState.ACTIVE
- && !transportLink.isLocalHandleSet()) || link.getRemoteState() == EndpointState.UNINITIALIZED)
- {
-
- UnsignedInteger localHandle = transportSession.allocateLocalHandle(transportLink);
-
- if(link.getRemoteState() == EndpointState.UNINITIALIZED)
- {
- transportSession.addHalfOpenLink(transportLink);
- }
-
- Attach attach = new Attach();
- attach.setHandle(localHandle);
- attach.setName(transportLink.getName());
-
- if(link.getSenderSettleMode() != null)
- {
- attach.setSndSettleMode(link.getSenderSettleMode());
- }
-
- if(link.getReceiverSettleMode() != null)
- {
- attach.setRcvSettleMode(link.getReceiverSettleMode());
- }
-
- if(link.getSource() != null)
- {
- attach.setSource(link.getSource());
- }
-
- if(link.getTarget() != null)
- {
- attach.setTarget(link.getTarget());
- }
-
- if(link.getProperties() != null)
- {
- attach.setProperties(link.getProperties());
- }
-
- if(link.getOfferedCapabilities() != null)
- {
- attach.setOfferedCapabilities(link.getOfferedCapabilities());
- }
-
- if(link.getDesiredCapabilities() != null)
- {
- attach.setDesiredCapabilities(link.getDesiredCapabilities());
- }
-
- attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
-
- if(link instanceof SenderImpl)
- {
- attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
- }
-
- writeFrame(transportSession.getLocalChannel(), attach, null, null);
- transportLink.sentAttach();
- }
- }
- }
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private void processHeader()
- {
- if(!_headerWritten)
- {
- _frameWriter.writeHeader(AmqpHeader.HEADER);
- _headerWritten = true;
- }
- }
-
- private void processOpen()
- {
- if (!_isOpenSent && (_condition != null ||
- (_connectionEndpoint != null &&
- _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)))
- {
- Open open = new Open();
- if (_connectionEndpoint != null) {
- String cid = _connectionEndpoint.getLocalContainerId();
- open.setContainerId(cid == null ? "" : cid);
- open.setHostname(_connectionEndpoint.getHostname());
- open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities());
- open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities());
- open.setProperties(_connectionEndpoint.getProperties());
- } else {
- open.setContainerId("");
- }
-
- if (_maxFrameSize > 0) {
- open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize));
- }
- if (_channelMax > 0) {
- open.setChannelMax(UnsignedShort.valueOf((short) _channelMax));
- }
-
- // as per the recommendation in the spec, advertise half our
- // actual timeout to the remote
- if (_localIdleTimeout > 0) {
- open.setIdleTimeOut(new UnsignedInteger(_localIdleTimeout / 2));
- }
- _isOpenSent = true;
-
- writeFrame(0, open, null, null);
- }
- }
-
- private void processBegin()
- {
- if(_connectionEndpoint != null && _isOpenSent && !_isCloseSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
- if(endpoint instanceof SessionImpl)
- {
- SessionImpl session = (SessionImpl) endpoint;
- TransportSession transportSession = getTransportState(session);
- if(session.getLocalState() != EndpointState.UNINITIALIZED && !transportSession.beginSent())
- {
- int channelId = allocateLocalChannel(transportSession);
- Begin begin = new Begin();
-
- if(session.getRemoteState() != EndpointState.UNINITIALIZED)
- {
- begin.setRemoteChannel(UnsignedShort.valueOf((short) transportSession.getRemoteChannel()));
- }
-
- transportSession.updateIncomingWindow();
-
- begin.setHandleMax(transportSession.getHandleMax());
- begin.setIncomingWindow(transportSession.getIncomingWindowSize());
- begin.setOutgoingWindow(transportSession.getOutgoingWindowSize());
- begin.setNextOutgoingId(transportSession.getNextOutgoingId());
-
- if(session.getProperties() != null)
- {
- begin.setProperties(session.getProperties());
- }
-
- if(session.getOfferedCapabilities() != null)
- {
- begin.setOfferedCapabilities(session.getOfferedCapabilities());
- }
-
- if(session.getDesiredCapabilities() != null)
- {
- begin.setDesiredCapabilities(session.getDesiredCapabilities());
- }
-
- writeFrame(channelId, begin, null, null);
- transportSession.sentBegin();
- }
- }
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private TransportSession getTransportState(SessionImpl session)
- {
- TransportSession transportSession = session.getTransportSession();
- if(transportSession == null)
- {
- transportSession = new TransportSession(this, session);
- session.setTransportSession(transportSession);
- }
- return transportSession;
- }
-
- private TransportLink<?> getTransportState(LinkImpl link)
- {
- TransportLink<?> transportLink = link.getTransportLink();
- if(transportLink == null)
- {
- transportLink = TransportLink.createTransportLink(link);
- }
- return transportLink;
- }
-
- private int allocateLocalChannel(TransportSession transportSession)
- {
- for (int i = 0; i < _connectionEndpoint.getMaxChannels(); i++)
- {
- if (!_localSessions.containsKey(i))
- {
- _localSessions.put(i, transportSession);
- transportSession.setLocalChannel(i);
- return i;
- }
- }
-
- return -1;
- }
-
- private int freeLocalChannel(TransportSession transportSession)
- {
- final int channel = transportSession.getLocalChannel();
- _localSessions.remove(channel);
- transportSession.freeLocalChannel();
- return channel;
- }
-
- private void processEnd()
- {
- if(_connectionEndpoint != null && _isOpenSent)
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
- SessionImpl session;
- TransportSession transportSession;
-
- if((endpoint instanceof SessionImpl)) {
- if ((session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED
- && (transportSession = session.getTransportSession()).isLocalChannelSet()
- && !_isCloseSent)
- {
- if (hasSendableMessages(session)) {
- endpoint = endpoint.transportNext();
- continue;
- }
-
- int channel = freeLocalChannel(transportSession);
- End end = new End();
- ErrorCondition localError = endpoint.getCondition();
- if( localError.getCondition() !=null )
- {
- end.setError(localError);
- }
-
- writeFrame(channel, end, null, null);
- }
-
- endpoint.clearModified();
- }
-
- endpoint = endpoint.transportNext();
- }
- }
- }
-
- private boolean hasSendableMessages(SessionImpl session)
- {
- if (_connectionEndpoint == null) {
- return false;
- }
-
- if(!_closeReceived && (session == null || !session.getTransportSession().endReceived()))
- {
- EndpointImpl endpoint = _connectionEndpoint.getTransportHead();
- while(endpoint != null)
- {
- if(endpoint instanceof SenderImpl)
- {
- SenderImpl sender = (SenderImpl) endpoint;
- if((session == null || sender.getSession() == session)
- && sender.getQueued() != 0
- && !getTransportState(sender).detachReceived())
- {
- return true;
- }
- }
- endpoint = endpoint.transportNext();
- }
- }
- return false;
- }
-
- private void processClose()
- {
- if ((_condition != null ||
- (_connectionEndpoint != null &&
- _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) &&
- !_isCloseSent) {
- if(!hasSendableMessages(null))
- {
- Close close = new Close();
-
- ErrorCondition localError;
-
- if (_connectionEndpoint == null) {
- localError = _condition;
- } else {
- localError = _connectionEndpoint.getCondition();
- }
-
- if(localError.getCondition() != null)
- {
- close.setError(localError);
- }
-
- _isCloseSent = true;
-
- writeFrame(0, close, null, null);
-
- if (_connectionEndpoint != null) {
- _connectionEndpoint.clearModified();
- }
- }
- }
- }
-
- protected void writeFrame(int channel, FrameBody frameBody,
- ByteBuffer payload, Runnable onPayloadTooLarge)
- {
- _frameWriter.writeFrame(channel, frameBody, payload, onPayloadTooLarge);
- }
-
- //==================================================================================================================
-
- @Override
- protected ConnectionImpl getConnectionImpl()
- {
- return _connectionEndpoint;
- }
-
- @Override
- void postFinal() {}
-
- @Override
- void doFree() { }
-
- //==================================================================================================================
- // handle incoming amqp data
-
-
- @Override
- public void handleOpen(Open open, Binary payload, Integer channel)
- {
- setRemoteState(EndpointState.ACTIVE);
- if(_connectionEndpoint != null)
- {
- _connectionEndpoint.handleOpen(open);
- }
- else
- {
- _open = open;
- }
-
- if(open.getMaxFrameSize().longValue() > 0)
- {
- _remoteMaxFrameSize = (int) open.getMaxFrameSize().longValue();
- _frameWriter.setMaxFrameSize(_remoteMaxFrameSize);
- }
-
- if (open.getChannelMax().longValue() > 0)
- {
- _remoteChannelMax = (int) open.getChannelMax().longValue();
- }
-
- if (open.getIdleTimeOut() != null && open.getIdleTimeOut().longValue() > 0)
- {
- _remoteIdleTimeout = open.getIdleTimeOut().intValue();
- }
- }
-
- @Override
- public void handleBegin(Begin begin, Binary payload, Integer channel)
- {
- // TODO - check channel < max_channel
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession != null)
- {
- // TODO - fail due to begin on begun session
- }
- else
- {
- SessionImpl session;
- if(begin.getRemoteChannel() == null)
- {
- session = _connectionEndpoint.session();
- transportSession = getTransportState(session);
- }
- else
- {
- transportSession = _localSessions.get(begin.getRemoteChannel().intValue());
- if (transportSession == null) {
- // TODO handle failure rather than just throwing a nicer NPE
- throw new NullPointerException("uncorrelated channel: " + begin.getRemoteChannel());
- }
- session = transportSession.getSession();
-
- }
- transportSession.setRemoteChannel(channel);
- session.setRemoteState(EndpointState.ACTIVE);
- transportSession.setNextIncomingId(begin.getNextOutgoingId());
- session.setRemoteProperties(begin.getProperties());
- session.setRemoteDesiredCapabilities(begin.getDesiredCapabilities());
- session.setRemoteOfferedCapabilities(begin.getOfferedCapabilities());
-
- _remoteSessions.put(channel, transportSession);
-
- _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session);
- }
-
- }
-
- @Override
- public void handleAttach(Attach attach, Binary payload, Integer channel)
- {
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession == null)
- {
- // TODO - fail due to attach on non-begun session
- }
- else
- {
- SessionImpl session = transportSession.getSession();
- final UnsignedInteger handle = attach.getHandle();
- if (handle.compareTo(transportSession.getHandleMax()) > 0) {
- // The handle-max value is the highest handle value that can be used on the session. A peer MUST
- // NOT attempt to attach a link using a handle value outside the range that its partner can handle.
- // A peer that receives a handle outside the supported range MUST close the connection with the
- // framing-error error-code.
- ErrorCondition condition =
- new ErrorCondition(ConnectionError.FRAMING_ERROR,
- "handle-max exceeded");
- _connectionEndpoint.setCondition(condition);
- _connectionEndpoint.setLocalState(EndpointState.CLOSED);
- if (!_isCloseSent) {
- Close close = new Close();
- close.setError(condition);
- _isCloseSent = true;
- writeFrame(0, close, null, null);
- }
- close_tail();
- return;
- }
- TransportLink<?> transportLink = transportSession.getLinkFromRemoteHandle(handle);
- LinkImpl link = null;
-
- if(transportLink != null)
- {
- // TODO - fail - attempt attach on a handle which is in use
- }
- else
- {
- transportLink = transportSession.resolveHalfOpenLink(attach.getName());
- if(transportLink == null)
- {
-
- link = (attach.getRole() == Role.RECEIVER)
- ? session.sender(attach.getName())
- : session.receiver(attach.getName());
- transportLink = getTransportState(link);
- }
- else
- {
- link = transportLink.getLink();
- }
- if(attach.getRole() == Role.SENDER)
- {
- transportLink.setDeliveryCount(attach.getInitialDeliveryCount());
- }
-
- link.setRemoteState(EndpointState.ACTIVE);
- link.setRemoteSource(attach.getSource());
- link.setRemoteTarget(attach.getTarget());
-
- link.setRemoteReceiverSettleMode(attach.getRcvSettleMode());
- link.setRemoteSenderSettleMode(attach.getSndSettleMode());
-
- link.setRemoteProperties(attach.getProperties());
-
- link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities());
- link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities());
-
- transportLink.setName(attach.getName());
- transportLink.setRemoteHandle(handle);
- transportSession.addLinkRemoteHandle(transportLink, handle);
-
- }
-
- _connectionEndpoint.put(Event.Type.LINK_REMOTE_OPEN, link);
- }
- }
-
- @Override
- public void handleFlow(Flow flow, Binary payload, Integer channel)
- {
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession == null)
- {
- // TODO - fail due to attach on non-begun session
- }
- else
- {
- transportSession.handleFlow(flow);
- }
-
- }
-
- @Override
- public void handleTransfer(Transfer transfer, Binary payload, Integer channel)
- {
- // TODO - check channel < max_channel
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession != null)
- {
- transportSession.handleTransfer(transfer, payload);
- }
- else
- {
- // TODO - fail due to begin on begun session
- }
- }
-
- @Override
- public void handleDisposition(Disposition disposition, Binary payload, Integer channel)
- {
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession == null)
- {
- // TODO - fail due to attach on non-begun session
- }
- else
- {
- transportSession.handleDisposition(disposition);
- }
- }
-
- @Override
- public void handleDetach(Detach detach, Binary payload, Integer channel)
- {
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession == null)
- {
- // TODO - fail due to attach on non-begun session
- }
- else
- {
- TransportLink<?> transportLink = transportSession.getLinkFromRemoteHandle(detach.getHandle());
-
- if(transportLink != null)
- {
- LinkImpl link = transportLink.getLink();
- transportLink.receivedDetach();
- transportSession.freeRemoteHandle(transportLink.getRemoteHandle());
- if (detach.getClosed()) {
- _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link);
- } else {
- _connectionEndpoint.put(Event.Type.LINK_REMOTE_DETACH, link);
- }
- transportLink.clearRemoteHandle();
- link.setRemoteState(EndpointState.CLOSED);
- if(detach.getError() != null)
- {
- link.getRemoteCondition().copyFrom(detach.getError());
- }
- }
- else
- {
- // TODO - fail - attempt attach on a handle which is in use
- }
- }
- }
-
- @Override
- public void handleEnd(End end, Binary payload, Integer channel)
- {
- TransportSession transportSession = _remoteSessions.get(channel);
- if(transportSession == null)
- {
- // TODO - fail due to attach on non-begun session
- }
- else
- {
- _remoteSessions.remove(channel);
- transportSession.receivedEnd();
- transportSession.unsetRemoteChannel();
- SessionImpl session = transportSession.getSession();
- session.setRemoteState(EndpointState.CLOSED);
- ErrorCondition errorCondition = end.getError();
- if(errorCondition != null)
- {
- session.getRemoteCondition().copyFrom(errorCondition);
- }
-
- _connectionEndpoint.put(Event.Type.SESSION_REMOTE_CLOSE, session);
- }
- }
-
- @Override
- public void handleClose(Close close, Binary payload, Integer channel)
- {
- _closeReceived = true;
- _remoteIdleTimeout = 0;
- setRemoteState(EndpointState.CLOSED);
- if(_connectionEndpoint != null)
- {
- _connectionEndpoint.setRemoteState(EndpointState.CLOSED);
- if(close.getError() != null)
- {
- _connectionEndpoint.getRemoteCondition().copyFrom(close.getError());
- }
-
- _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint);
- }
-
- }
-
- @Override
- public boolean handleFrame(TransportFrame frame)
- {
- if (!isHandlingFrames())
- {
- throw new IllegalStateException("Transport cannot accept frame: " + frame);
- }
-
- log(INCOMING, frame);
-
- ProtocolTracer tracer = _protocolTracer.get();
- if( tracer != null )
- {
- tracer.receivedFrame(frame);
- }
-
- frame.getBody().invoke(this,frame.getPayload(), frame.getChannel());
- return _closeReceived;
- }
-
- void put(Event.Type type, Object context) {
- if (_connectionEndpoint != null) {
- _connectionEndpoint.put(type, context);
- }
- }
-
- private void maybePostClosed()
- {
- if (postedHeadClosed && postedTailClosed) {
- put(Event.Type.TRANSPORT_CLOSED, this);
- }
- }
-
- @Override
- public void closed(TransportException error)
- {
- if (!_closeReceived || error != null) {
- if (error == null) {
- _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR,
- "connection aborted");
- } else {
- _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR,
- error.toString());
- }
- _head_closed = true;
- }
- if (_condition != null && !postedTransportError) {
- put(Event.Type.TRANSPORT_ERROR, this);
- postedTransportError = true;
- }
- if (!postedTailClosed) {
- put(Event.Type.TRANSPORT_TAIL_CLOSED, this);
- postedTailClosed = true;
- maybePostClosed();
- }
- }
-
- @Override
- public boolean isHandlingFrames()
- {
- return _connectionEndpoint != null || getRemoteState() == EndpointState.UNINITIALIZED;
- }
-
- @Override
- public ProtocolTracer getProtocolTracer()
- {
- return _protocolTracer.get();
- }
-
- @Override
- public void setProtocolTracer(ProtocolTracer protocolTracer)
- {
- this._protocolTracer.set(protocolTracer);
- }
-
- @Override
- public ByteBuffer getInputBuffer()
- {
- return tail();
- }
-
- @Override
- public TransportResult processInput()
- {
- try {
- process();
- return TransportResultFactory.ok();
- } catch (TransportException e) {
- return TransportResultFactory.error(e);
- }
- }
-
- @Override
- public ByteBuffer getOutputBuffer()
- {
- pending();
- return head();
- }
-
- @Override
- public void outputConsumed()
- {
- pop(_outputProcessor.head().position());
- }
-
- @Override
- public int capacity()
- {
- init();
- return _inputProcessor.capacity();
- }
-
- @Override
- public ByteBuffer tail()
- {
- init();
- return _inputProcessor.tail();
- }
-
- @Override
- public void process() throws TransportException
- {
- _processingStarted = true;
-
- try {
- init();
- int beforePosition = _inputProcessor.position();
- _inputProcessor.process();
- _bytesInput += beforePosition - _inputProcessor.position();
- } catch (TransportException e) {
- _head_closed = true;
- throw e;
- }
- }
-
- @Override
- public void close_tail()
- {
- init();
- _inputProcessor.close_tail();
- }
-
- @Override
- public int pending()
- {
- init();
- return _outputProcessor.pending();
- }
-
- @Override
- public ByteBuffer head()
- {
- init();
- return _outputProcessor.head();
- }
-
- @Override
- public void pop(int bytes)
- {
- init();
- _outputProcessor.pop(bytes);
- _bytesOutput += bytes;
-
- int p = pending();
- if (p < 0 && !postedHeadClosed) {
- put(Event.Type.TRANSPORT_HEAD_CLOSED, this);
- postedHeadClosed = true;
- maybePostClosed();
- }
- }
-
- @Override
- public void setIdleTimeout(int timeout) {
- _localIdleTimeout = timeout;
- }
-
- @Override
- public int getIdleTimeout() {
- return _localIdleTimeout;
- }
-
- @Override
- public int getRemoteIdleTimeout() {
- return _remoteIdleTimeout;
- }
-
- @Override
- public long tick(long now)
- {
- long timeout = 0;
-
- if (_localIdleTimeout > 0) {
- if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) {
- _localIdleDeadline = now + _localIdleTimeout;
- _lastBytesInput = _bytesInput;
- } else if (_localIdleDeadline <= now) {
- _localIdleDeadline = now + _localIdleTimeout;
-
- if (_connectionEndpoint != null &&
- _connectionEndpoint.getLocalState() != EndpointState.CLOSED) {
- ErrorCondition condition =
- new ErrorCondition(Symbol.getSymbol("amqp:resource-limit-exceeded"),
- "local-idle-timeout expired");
- _connectionEndpoint.setCondition(condition);
- _connectionEndpoint.setLocalState(EndpointState.CLOSED);
-
- if (!_isOpenSent) {
- if ((_sasl != null) && (!_sasl.isDone())) {
- _sasl.fail();
- }
- Open open = new Open();
- _isOpenSent = true;
- writeFrame(0, open, null, null);
- }
- if (!_isCloseSent) {
- Close close = new Close();
- close.setError(condition);
- _isCloseSent = true;
- writeFrame(0, close, null, null);
- }
- close_tail();
- }
- }
- timeout = _localIdleDeadline;
- }
-
- if (_remoteIdleTimeout != 0 && !_isCloseSent) {
- if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) {
- _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
- _lastBytesOutput = _bytesOutput;
- } else if (_remoteIdleDeadline <= now) {
- _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
- if (pending() == 0) {
- writeFrame(0, null, null, null);
- _lastBytesOutput += pending();
- }
- }
- timeout = Math.min(timeout == 0 ? _remoteIdleDeadline : timeout, _remoteIdleDeadline);
- }
-
- return timeout;
- }
-
- @Override
- public long getFramesOutput()
- {
- return _frameWriter.getFramesOutput();
- }
-
- @Override
- public long getFramesInput()
- {
- return _frameParser.getFramesInput();
- }
-
- @Override
- public void close_head()
- {
- _outputProcessor.close_head();
- }
-
- @Override
- public boolean isClosed() {
- int p = pending();
- int c = capacity();
- return p == END_OF_STREAM && c == END_OF_STREAM;
- }
-
- @Override
- public String toString()
- {
- return "TransportImpl [_connectionEndpoint=" + _connectionEndpoint + ", " + super.toString() + "]";
- }
-
- private static class PartialTransfer implements Runnable
- {
- private final Transfer _transfer;
-
- public PartialTransfer(Transfer transfer)
- {
- _transfer = transfer;
- }
-
- @Override
- public void run()
- {
- _transfer.setMore(true);
- }
- }
-
- /**
- * Override the default frame handler. Must be called before the transport starts being used
- * (e.g. {@link #getInputBuffer()}, {@link #getOutputBuffer()}, {@link #ssl(SslDomain)} etc).
- */
- public void setFrameHandler(FrameHandler frameHandler)
- {
- _frameHandler = frameHandler;
- }
-
- static String INCOMING = "<-";
- static String OUTGOING = "->";
-
- void log(String event, TransportFrame frame)
- {
- if (isTraceFramesEnabled()) {
- StringBuilder msg = new StringBuilder();
- msg.append("[").append(System.identityHashCode(this)).append(":")
- .append(frame.getChannel()).append("]");
- msg.append(" ").append(event).append(" ").append(frame.getBody());
-
- Binary bin = frame.getPayload();
- if (bin != null) {
- msg.append(" (").append(bin.getLength()).append(") ");
- msg.append(StringUtils.toQuotedString(bin, TRACE_FRAME_PAYLOAD_LENGTH, true));
- }
- System.out.println(msg.toString());
- }
- }
-
- boolean isTraceFramesEnabled()
- {
- return (_levels & TRACE_FRM) != 0;
- }
-
- @Override
- void localOpen() {}
-
- @Override
- void localClose() {}
-
- public void setSelectable(Selectable selectable) {
- _selectable = selectable;
- }
-
- public Selectable getSelectable() {
- return _selectable;
- }
-
- public void setReactor(Reactor reactor) {
- _reactor = reactor;
- }
-
- public Reactor getReactor() {
- return _reactor;
- }
-
- @Override
- public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend)
- {
- _emitFlowEventOnSend = emitFlowEventOnSend;
- }
-
- @Override
- public boolean isEmitFlowEventOnSend()
- {
- return _emitFlowEventOnSend;
- }
-
- // From TransportInternal
- @Override
- public void addTransportLayer(TransportLayer layer)
- {
- if (_processingStarted)
- {
- throw new IllegalStateException("Additional layer can't be added after transport has started processing");
- }
-
- if (_additionalTransportLayers == null)
- {
- _additionalTransportLayers = new ArrayList<TransportLayer>();
- }
-
- if (!_additionalTransportLayers.contains(layer))
- {
- init();
- TransportWrapper transportWrapper = layer.wrap(_inputProcessor, _outputProcessor);
- _inputProcessor = transportWrapper;
- _outputProcessor = transportWrapper;
- _additionalTransportLayers.add(layer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInput.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInput.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInput.java
deleted file mode 100644
index f2699d0..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInput.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.TransportException;
-
-
-public interface TransportInput
-{
-
- int capacity();
-
- int position();
-
- ByteBuffer tail() throws TransportException;
-
- void process() throws TransportException;
-
- void close_tail();
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
deleted file mode 100644
index 73b4d44..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportInternal.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.Transport;
-
-/**
- * Extended Transport interface providing access to certain methods intended mainly for internal
- * use, or use in extending implementation details not strictly considered part of the public
- * Transport API.
- */
-public interface TransportInternal extends Transport
-{
- /**
- * Add a {@link TransportLayer} to the transport, wrapping the input and output process handlers
- * in the state they currently exist. No effect if the given layer was previously added.
- *
- * @param layer the layer to add (if it was not previously added)
- * @throws IllegalStateException if processing has already started.
- */
- void addTransportLayer(TransportLayer layer) throws IllegalStateException;
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLayer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLayer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLayer.java
deleted file mode 100644
index a6cdeb1..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLayer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.impl.TransportInput;
-import org.apache.qpid.proton.engine.impl.TransportOutput;
-import org.apache.qpid.proton.engine.impl.TransportWrapper;
-
-public interface TransportLayer
-{
- public TransportWrapper wrap(TransportInput input, TransportOutput output);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
deleted file mode 100644
index 836cf71..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.transport.Flow;
-import org.apache.qpid.proton.engine.Event;
-
-class TransportLink<T extends LinkImpl>
-{
- private UnsignedInteger _localHandle;
- private String _name;
- private UnsignedInteger _remoteHandle;
- private UnsignedInteger _deliveryCount;
- private UnsignedInteger _linkCredit = UnsignedInteger.ZERO;
- private T _link;
- private UnsignedInteger _remoteDeliveryCount;
- private UnsignedInteger _remoteLinkCredit;
- private boolean _detachReceived;
- private boolean _attachSent;
-
- protected TransportLink(T link)
- {
- _link = link;
- _name = link.getName();
- }
-
- static <L extends LinkImpl> TransportLink<L> createTransportLink(L link)
- {
- if (link instanceof ReceiverImpl)
- {
- ReceiverImpl r = (ReceiverImpl) link;
- TransportReceiver tr = new TransportReceiver(r);
- r.setTransportLink(tr);
-
- return (TransportLink<L>) tr;
- }
- else
- {
- SenderImpl s = (SenderImpl) link;
- TransportSender ts = new TransportSender(s);
- s.setTransportLink(ts);
-
- return (TransportLink<L>) ts;
- }
- }
-
- void unbind()
- {
- clearLocalHandle();
- clearRemoteHandle();
- }
-
- public UnsignedInteger getLocalHandle()
- {
- return _localHandle;
- }
-
- public void setLocalHandle(UnsignedInteger localHandle)
- {
- if (_localHandle == null) {
- _link.incref();
- }
- _localHandle = localHandle;
- }
-
- public boolean isLocalHandleSet()
- {
- return _localHandle != null;
- }
-
- public String getName()
- {
- return _name;
- }
-
- public void setName(String name)
- {
- _name = name;
- }
-
- public void clearLocalHandle()
- {
- if (_localHandle != null) {
- _link.decref();
- }
- _localHandle = null;
- }
-
- public UnsignedInteger getRemoteHandle()
- {
- return _remoteHandle;
- }
-
- public void setRemoteHandle(UnsignedInteger remoteHandle)
- {
- if (_remoteHandle == null) {
- _link.incref();
- }
- _remoteHandle = remoteHandle;
- }
-
- public void clearRemoteHandle()
- {
- if (_remoteHandle != null) {
- _link.decref();
- }
- _remoteHandle = null;
- }
-
- public UnsignedInteger getDeliveryCount()
- {
- return _deliveryCount;
- }
-
- public UnsignedInteger getLinkCredit()
- {
- return _linkCredit;
- }
-
- public void addCredit(int credits)
- {
- _linkCredit = UnsignedInteger.valueOf(_linkCredit.intValue() + credits);
- }
-
- public boolean hasCredit()
- {
- return getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
- }
-
- public T getLink()
- {
- return _link;
- }
-
- void handleFlow(Flow flow)
- {
- _remoteDeliveryCount = flow.getDeliveryCount();
- _remoteLinkCredit = flow.getLinkCredit();
-
-
- _link.getConnectionImpl().put(Event.Type.LINK_FLOW, _link);
- }
-
- void setLinkCredit(UnsignedInteger linkCredit)
- {
- _linkCredit = linkCredit;
- }
-
- public void setDeliveryCount(UnsignedInteger deliveryCount)
- {
- _deliveryCount = deliveryCount;
- }
-
- public void settled(TransportDelivery transportDelivery)
- {
- getLink().getSession().getTransportSession().settled(transportDelivery);
- }
-
-
- UnsignedInteger getRemoteDeliveryCount()
- {
- return _remoteDeliveryCount;
- }
-
- UnsignedInteger getRemoteLinkCredit()
- {
- return _remoteLinkCredit;
- }
-
- public void setRemoteLinkCredit(UnsignedInteger remoteLinkCredit)
- {
- _remoteLinkCredit = remoteLinkCredit;
- }
-
- void decrementLinkCredit()
- {
- _linkCredit = _linkCredit.subtract(UnsignedInteger.ONE);
- }
-
- void incrementDeliveryCount()
- {
- _deliveryCount = _deliveryCount.add(UnsignedInteger.ONE);
- }
-
- public void receivedDetach()
- {
- _detachReceived = true;
- }
-
- public boolean detachReceived()
- {
- return _detachReceived;
- }
-
- public boolean attachSent()
- {
- return _attachSent;
- }
-
- public void sentAttach()
- {
- _attachSent = true;
- }
-
- public void setRemoteDeliveryCount(UnsignedInteger remoteDeliveryCount)
- {
- _remoteDeliveryCount = remoteDeliveryCount;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org