You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/09 15:24:51 UTC
[15/34] qpid-proton git commit: PROTON-1385: remove proton-j from the
existing repo, it now has its own repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
deleted file mode 100644
index fb1b06a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.transport.EmptyFrame;
-import org.apache.qpid.proton.amqp.transport.FrameBody;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.framing.TransportFrame;
-
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-/**
- * FrameWriter
- *
- */
-
-class FrameWriter
-{
-
- static final byte AMQP_FRAME_TYPE = 0;
- static final byte SASL_FRAME_TYPE = (byte) 1;
-
- private EncoderImpl _encoder;
- private ByteBuffer _bbuf;
- private WritableBuffer _buffer;
- private int _maxFrameSize;
- private byte _frameType;
- final private Ref<ProtocolTracer> _protocolTracer;
- private TransportImpl _transport;
-
- private int _frameStart = 0;
- private int _payloadStart;
- private int _performativeSize;
- private long _framesOutput = 0;
-
- FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
- Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
- {
- _encoder = encoder;
- _bbuf = ByteBuffer.allocate(1024);
- _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
- _encoder.setByteBuffer(_buffer);
- _maxFrameSize = maxFrameSize;
- _frameType = frameType;
- _protocolTracer = protocolTracer;
- _transport = transport;
- }
-
- void setMaxFrameSize(int maxFrameSize)
- {
- _maxFrameSize = maxFrameSize;
- }
-
- private void grow()
- {
- ByteBuffer old = _bbuf;
- _bbuf = ByteBuffer.allocate(_bbuf.capacity() * 2);
- _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
- old.flip();
- _bbuf.put(old);
- _encoder.setByteBuffer(_buffer);
- }
-
- void writeHeader(byte[] header)
- {
- _buffer.put(header, 0, header.length);
- }
-
- private void startFrame()
- {
- _frameStart = _buffer.position();
- }
-
- private void writePerformative(Object frameBody)
- {
- while (_buffer.remaining() < 8) {
- grow();
- }
-
- while (true)
- {
- try
- {
- _buffer.position(_frameStart + 8);
- if (frameBody != null) _encoder.writeObject(frameBody);
- break;
- }
- catch (BufferOverflowException e)
- {
- grow();
- }
- }
-
- _payloadStart = _buffer.position();
- _performativeSize = _payloadStart - _frameStart;
- }
-
- private void endFrame(int channel)
- {
- int frameSize = _buffer.position() - _frameStart;
- int limit = _buffer.position();
- _buffer.position(_frameStart);
- _buffer.putInt(frameSize);
- _buffer.put((byte) 2);
- _buffer.put(_frameType);
- _buffer.putShort((short) channel);
- _buffer.position(limit);
- }
-
- void writeFrame(int channel, Object frameBody, ByteBuffer payload,
- Runnable onPayloadTooLarge)
- {
- startFrame();
-
- writePerformative(frameBody);
-
- if(_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
- {
- if(onPayloadTooLarge != null)
- {
- onPayloadTooLarge.run();
- }
- writePerformative(frameBody);
- }
-
- int capacity;
- if (_maxFrameSize > 0) {
- capacity = _maxFrameSize - _performativeSize;
- } else {
- capacity = Integer.MAX_VALUE;
- }
- int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
-
- ProtocolTracer tracer = _protocolTracer == null ? null : _protocolTracer.get();
- if( tracer != null || _transport.isTraceFramesEnabled())
- {
- // XXX: this is a bit of a hack but it eliminates duplicate
- // code, further refactor will fix this
- if (_frameType == AMQP_FRAME_TYPE)
- {
- ByteBuffer originalPayload = null;
- if( payload!=null )
- {
- originalPayload = payload.duplicate();
- originalPayload.limit(payload.position() + payloadSize);
- }
-
- Binary payloadBin = Binary.create(originalPayload);
- FrameBody body = null;
- if (frameBody == null)
- {
- body = new EmptyFrame();
- }
- else
- {
- body = (FrameBody) frameBody;
- }
-
- TransportFrame frame = new TransportFrame(channel, body, payloadBin);
-
- _transport.log(TransportImpl.OUTGOING, frame);
-
- if(tracer != null)
- {
- tracer.sentFrame(frame);
- }
- }
- }
-
- if(payloadSize > 0)
- {
- while (_buffer.remaining() < payloadSize) {
- grow();
- }
-
- int oldLimit = payload.limit();
- payload.limit(payload.position() + payloadSize);
- _buffer.put(payload);
- payload.limit(oldLimit);
- }
-
- endFrame(channel);
-
- _framesOutput += 1;
- }
-
- void writeFrame(Object frameBody)
- {
- writeFrame(0, frameBody, null, null);
- }
-
- boolean isFull() {
- // XXX: this should probably be tunable
- return _bbuf.position() > 64*1024;
- }
-
- int readBytes(ByteBuffer dst)
- {
- ByteBuffer src = _bbuf.duplicate();
- src.flip();
-
- int size = Math.min(src.remaining(), dst.remaining());
- int limit = src.limit();
- src.limit(size);
- dst.put(src);
- src.limit(limit);
- _bbuf.rewind();
- _bbuf.put(src);
-
- return size;
- }
-
- long getFramesOutput()
- {
- return _framesOutput;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
deleted file mode 100644
index 6a5aac5..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.engine.impl.TransportWrapper;
-
-public abstract class HandshakeSniffingTransportWrapper<T1 extends TransportWrapper, T2 extends TransportWrapper>
- implements TransportWrapper
-{
-
- protected final T1 _wrapper1;
- protected final T2 _wrapper2;
-
- private boolean _tail_closed = false;
- private boolean _head_closed = false;
- protected TransportWrapper _selectedTransportWrapper;
-
- private final ByteBuffer _determinationBuffer;
-
- protected HandshakeSniffingTransportWrapper
- (T1 wrapper1,
- T2 wrapper2)
- {
- _wrapper1 = wrapper1;
- _wrapper2 = wrapper2;
- _determinationBuffer = ByteBuffer.allocate(bufferSize());
- }
-
- @Override
- public int capacity()
- {
- if (isDeterminationMade())
- {
- return _selectedTransportWrapper.capacity();
- }
- else
- {
- if (_tail_closed) { return Transport.END_OF_STREAM; }
- return _determinationBuffer.remaining();
- }
- }
-
- @Override
- public int position()
- {
- if (isDeterminationMade())
- {
- return _selectedTransportWrapper.position();
- }
- else
- {
- if (_tail_closed) { return Transport.END_OF_STREAM; }
- return _determinationBuffer.position();
- }
- }
-
- @Override
- public ByteBuffer tail()
- {
- if (isDeterminationMade())
- {
- return _selectedTransportWrapper.tail();
- }
- else
- {
- return _determinationBuffer;
- }
- }
-
- protected abstract int bufferSize();
-
- protected abstract void makeDetermination(byte[] bytes);
-
- @Override
- public void process() throws TransportException
- {
- if (isDeterminationMade())
- {
- _selectedTransportWrapper.process();
- }
- else if (_determinationBuffer.remaining() == 0)
- {
- _determinationBuffer.flip();
- byte[] bytesInput = new byte[_determinationBuffer.remaining()];
- _determinationBuffer.get(bytesInput);
- makeDetermination(bytesInput);
- _determinationBuffer.rewind();
-
- // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
- _selectedTransportWrapper.tail().put(_determinationBuffer);
- _selectedTransportWrapper.process();
- } else if (_tail_closed) {
- throw new TransportException("connection aborted");
- }
- }
-
- @Override
- public void close_tail()
- {
- try {
- if (isDeterminationMade())
- {
- _selectedTransportWrapper.close_tail();
- }
- } finally {
- _tail_closed = true;
- }
- }
-
- @Override
- public int pending()
- {
- if (_head_closed) { return Transport.END_OF_STREAM; }
-
- if (isDeterminationMade()) {
- return _selectedTransportWrapper.pending();
- } else {
- return 0;
- }
-
- }
-
- private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
-
- @Override
- public ByteBuffer head()
- {
- if (isDeterminationMade()) {
- return _selectedTransportWrapper.head();
- } else {
- return EMPTY;
- }
- }
-
- @Override
- public void pop(int bytes)
- {
- if (isDeterminationMade()) {
- _selectedTransportWrapper.pop(bytes);
- } else if (bytes > 0) {
- throw new IllegalStateException("no bytes have been read");
- }
- }
-
- @Override
- public void close_head()
- {
- if (isDeterminationMade()) {
- _selectedTransportWrapper.close_head();
- } else {
- _head_closed = true;
- }
- }
-
- protected boolean isDeterminationMade()
- {
- return _selectedTransportWrapper != null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
deleted file mode 100644
index a67785e..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.amqp.transport.Source;
-import org.apache.qpid.proton.amqp.transport.Target;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-
-public abstract class LinkImpl extends EndpointImpl implements Link
-{
-
- private final SessionImpl _session;
-
- DeliveryImpl _head;
- DeliveryImpl _tail;
- DeliveryImpl _current;
- private String _name;
- private Source _source;
- private Source _remoteSource;
- private Target _target;
- private Target _remoteTarget;
- private int _queued;
- private int _credit;
- private int _unsettled;
- private int _drained;
-
- private SenderSettleMode _senderSettleMode;
- private SenderSettleMode _remoteSenderSettleMode;
- private ReceiverSettleMode _receiverSettleMode;
- private ReceiverSettleMode _remoteReceiverSettleMode;
-
-
- private LinkNode<LinkImpl> _node;
- private boolean _drain;
- private boolean _detached;
- private Map<Symbol, Object> _properties;
- private Map<Symbol, Object> _remoteProperties;
- private Symbol[] _offeredCapabilities;
- private Symbol[] _remoteOfferedCapabilities;
- private Symbol[] _desiredCapabilities;
- private Symbol[] _remoteDesiredCapabilities;
-
- LinkImpl(SessionImpl session, String name)
- {
- _session = session;
- _session.incref();
- _name = name;
- ConnectionImpl conn = session.getConnectionImpl();
- _node = conn.addLinkEndpoint(this);
- conn.put(Event.Type.LINK_INIT, this);
- }
-
-
- @Override
- public String getName()
- {
- return _name;
- }
-
- @Override
- public DeliveryImpl delivery(byte[] tag)
- {
- return delivery(tag, 0, tag.length);
- }
-
- @Override
- public DeliveryImpl delivery(byte[] tag, int offset, int length)
- {
- if (offset != 0 || length != tag.length)
- {
- throw new IllegalArgumentException("At present delivery tag must be the whole byte array");
- }
- incrementQueued();
- try
- {
- DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail);
- if (_tail == null)
- {
- _head = delivery;
- }
- _tail = delivery;
- if (_current == null)
- {
- _current = delivery;
- }
- getConnectionImpl().workUpdate(delivery);
- return delivery;
- }
- catch (RuntimeException e)
- {
- e.printStackTrace();
- throw e;
- }
- }
-
- @Override
- void postFinal() {
- _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this);
- _session.decref();
- }
-
- @Override
- void doFree()
- {
- DeliveryImpl dlv = _head;
- while (dlv != null) {
- dlv.free();
- dlv = dlv.next();
- }
-
- _session.getConnectionImpl().removeLinkEndpoint(_node);
- _node = null;
- }
-
- void modifyEndpoints() {
- modified();
- }
-
- public void remove(DeliveryImpl delivery)
- {
- if(_head == delivery)
- {
- _head = delivery.getLinkNext();
- }
- if(_tail == delivery)
- {
- _tail = delivery.getLinkPrevious();
- }
- if(_current == delivery)
- {
- // TODO - what???
- }
- }
-
- @Override
- public DeliveryImpl current()
- {
- return _current;
- }
-
- @Override
- public boolean advance()
- {
- if(_current != null )
- {
- DeliveryImpl oldCurrent = _current;
- _current = _current.getLinkNext();
- getConnectionImpl().workUpdate(oldCurrent);
-
- if(_current != null)
- {
- getConnectionImpl().workUpdate(_current);
- }
- return true;
- }
- else
- {
- return false;
- }
-
- }
-
- @Override
- protected ConnectionImpl getConnectionImpl()
- {
- return _session.getConnectionImpl();
- }
-
- @Override
- public SessionImpl getSession()
- {
- return _session;
- }
-
- @Override
- public Source getRemoteSource()
- {
- return _remoteSource;
- }
-
- void setRemoteSource(Source source)
- {
- _remoteSource = source;
- }
-
- @Override
- public Target getRemoteTarget()
- {
- return _remoteTarget;
- }
-
- void setRemoteTarget(Target target)
- {
- _remoteTarget = target;
- }
-
- @Override
- public Source getSource()
- {
- return _source;
- }
-
- @Override
- public void setSource(Source source)
- {
- // TODO - should be an error if local state is ACTIVE
- _source = source;
- }
-
- @Override
- public Target getTarget()
- {
- return _target;
- }
-
- @Override
- public void setTarget(Target target)
- {
- // TODO - should be an error if local state is ACTIVE
- _target = target;
- }
-
- @Override
- public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
- {
- LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote);
-
- LinkNode<LinkImpl> linkNode = _node.next(query);
-
- return linkNode == null ? null : linkNode.getValue();
-
- }
-
- abstract TransportLink getTransportLink();
-
- @Override
- public int getCredit()
- {
- return _credit;
- }
-
- public void addCredit(int credit)
- {
- _credit+=credit;
- }
-
- public void setCredit(int credit)
- {
- _credit = credit;
- }
-
- boolean hasCredit()
- {
- return _credit > 0;
- }
-
- void incrementCredit()
- {
- _credit++;
- }
-
- void decrementCredit()
- {
- _credit--;
- }
-
- @Override
- public int getQueued()
- {
- return _queued;
- }
-
- void incrementQueued()
- {
- _queued++;
- }
-
- void decrementQueued()
- {
- _queued--;
- }
-
- @Override
- public int getUnsettled()
- {
- return _unsettled;
- }
-
- void incrementUnsettled()
- {
- _unsettled++;
- }
-
- void decrementUnsettled()
- {
- _unsettled--;
- }
-
- void setDrain(boolean drain)
- {
- _drain = drain;
- }
-
- @Override
- public boolean getDrain()
- {
- return _drain;
- }
-
- @Override
- public SenderSettleMode getSenderSettleMode()
- {
- return _senderSettleMode;
- }
-
- @Override
- public void setSenderSettleMode(SenderSettleMode senderSettleMode)
- {
- _senderSettleMode = senderSettleMode;
- }
-
- @Override
- public SenderSettleMode getRemoteSenderSettleMode()
- {
- return _remoteSenderSettleMode;
- }
-
- @Override
- public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode)
- {
- _remoteSenderSettleMode = remoteSenderSettleMode;
- }
-
- @Override
- public ReceiverSettleMode getReceiverSettleMode()
- {
- return _receiverSettleMode;
- }
-
- @Override
- public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode)
- {
- _receiverSettleMode = receiverSettleMode;
- }
-
- @Override
- public ReceiverSettleMode getRemoteReceiverSettleMode()
- {
- return _remoteReceiverSettleMode;
- }
-
- void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode)
- {
- _remoteReceiverSettleMode = remoteReceiverSettleMode;
- }
-
- @Override
- public Map<Symbol, Object> getProperties()
- {
- return _properties;
- }
-
- @Override
- public void setProperties(Map<Symbol, Object> properties)
- {
- _properties = properties;
- }
-
- @Override
- public Map<Symbol, Object> getRemoteProperties()
- {
- return _remoteProperties;
- }
-
- void setRemoteProperties(Map<Symbol, Object> remoteProperties)
- {
- _remoteProperties = remoteProperties;
- }
-
- @Override
- public Symbol[] getDesiredCapabilities()
- {
- return _desiredCapabilities;
- }
-
- @Override
- public void setDesiredCapabilities(Symbol[] desiredCapabilities)
- {
- _desiredCapabilities = desiredCapabilities;
- }
-
- @Override
- public Symbol[] getRemoteDesiredCapabilities()
- {
- return _remoteDesiredCapabilities;
- }
-
- void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
- {
- _remoteDesiredCapabilities = remoteDesiredCapabilities;
- }
-
- @Override
- public Symbol[] getOfferedCapabilities()
- {
- return _offeredCapabilities;
- }
-
- @Override
- public void setOfferedCapabilities(Symbol[] offeredCapabilities)
- {
- _offeredCapabilities = offeredCapabilities;
- }
-
- @Override
- public Symbol[] getRemoteOfferedCapabilities()
- {
- return _remoteOfferedCapabilities;
- }
-
- void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
- {
- _remoteOfferedCapabilities = remoteOfferedCapabilities;
- }
-
- @Override
- public int drained()
- {
- int drained = 0;
-
- if (this instanceof SenderImpl) {
- if(getDrain() && hasCredit())
- {
- _drained = getCredit();
- setCredit(0);
- modified();
- drained = _drained;
- }
- } else {
- drained = _drained;
- _drained = 0;
- }
-
- return drained;
- }
-
- int getDrained()
- {
- return _drained;
- }
-
- void setDrained(int value)
- {
- _drained = value;
- }
-
- @Override
- public DeliveryImpl head()
- {
- return _head;
- }
-
- @Override
- void localOpen()
- {
- getConnectionImpl().put(Event.Type.LINK_LOCAL_OPEN, this);
- }
-
- @Override
- void localClose()
- {
- getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this);
- }
-
- @Override
- public void detach()
- {
- _detached = true;
- getConnectionImpl().put(Event.Type.LINK_LOCAL_DETACH, this);
- modified();
- }
-
- public boolean detached()
- {
- return _detached;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
deleted file mode 100644
index e3a8f78..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.qpid.proton.engine.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-
-class LinkNode<E>
-{
- public interface Query<T>
- {
- public boolean matches(LinkNode<T> node);
- }
-
-
- private E _value;
- private LinkNode<E> _prev;
- private LinkNode<E> _next;
-
- private LinkNode(E value)
- {
- _value = value;
- }
-
- public E getValue()
- {
- return _value;
- }
-
- public LinkNode<E> getPrev()
- {
- return _prev;
- }
-
- public LinkNode<E> getNext()
- {
- return _next;
- }
-
- public LinkNode<E> next(Query<E> query)
- {
- LinkNode<E> next = _next;
- while(next != null && !query.matches(next))
- {
- next = next.getNext();
- }
- return next;
- }
-
- public LinkNode<E> remove()
- {
- LinkNode<E> prev = _prev;
- LinkNode<E> next = _next;
- if(prev != null)
- {
- prev._next = next;
- }
- if(next != null)
- {
- next._prev = prev;
- }
- _next = _prev = null;
- return next;
- }
-
- public LinkNode<E> addAtTail(E value)
- {
- if(_next == null)
- {
- _next = new LinkNode<E>(value);
- _next._prev = this;
- return _next;
- }
- else
- {
- return _next.addAtTail(value);
- }
- }
-
- public static <T> LinkNode<T> newList(T value)
- {
- return new LinkNode<T>(value);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
deleted file mode 100644
index 62b75b3..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.TransportException;
-
-public class PlainTransportWrapper implements TransportWrapper
-{
- private final TransportOutput _outputProcessor;
- private final TransportInput _inputProcessor;
-
- public PlainTransportWrapper(TransportOutput outputProcessor,
- TransportInput inputProcessor)
- {
- _outputProcessor = outputProcessor;
- _inputProcessor = inputProcessor;
- }
-
- @Override
- public int capacity()
- {
- return _inputProcessor.capacity();
- }
-
- @Override
- public int position()
- {
- return _inputProcessor.position();
- }
-
- @Override
- public ByteBuffer tail()
- {
- return _inputProcessor.tail();
- }
-
- @Override
- public void process() throws TransportException
- {
- _inputProcessor.process();
- }
-
- @Override
- public void close_tail()
- {
- _inputProcessor.close_tail();
- }
-
- @Override
- public int pending()
- {
- return _outputProcessor.pending();
- }
-
- @Override
- public ByteBuffer head()
- {
- return _outputProcessor.head();
- }
-
- @Override
- public void pop(int bytes)
- {
- _outputProcessor.pop(bytes);
- }
-
- @Override
- public void close_head()
- {
- _outputProcessor.close_head();
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
deleted file mode 100644
index 92ad598..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.framing.TransportFrame;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface ProtocolTracer
-{
- public void receivedFrame(TransportFrame transportFrame);
- public void sentFrame(TransportFrame transportFrame);
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
deleted file mode 100644
index 6f86700..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.engine.Receiver;
-
-public class ReceiverImpl extends LinkImpl implements Receiver
-{
- private boolean _drainFlagMode = true;
-
- @Override
- public boolean advance()
- {
- DeliveryImpl current = current();
- if(current != null)
- {
- current.setDone();
- }
- final boolean advance = super.advance();
- if(advance)
- {
- decrementQueued();
- decrementCredit();
- getSession().incrementIncomingBytes(-current.pending());
- getSession().incrementIncomingDeliveries(-1);
- if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
- modified();
- }
- }
- return advance;
- }
-
- private TransportReceiver _transportReceiver;
- private int _unsentCredits;
-
-
- ReceiverImpl(SessionImpl session, String name)
- {
- super(session, name);
- }
-
- @Override
- public void flow(final int credits)
- {
- addCredit(credits);
- _unsentCredits += credits;
- modified();
- if (!_drainFlagMode)
- {
- setDrain(false);
- _drainFlagMode = false;
- }
- }
-
- int clearUnsentCredits()
- {
- int credits = _unsentCredits;
- _unsentCredits = 0;
- return credits;
- }
-
- @Override
- public int recv(final byte[] bytes, int offset, int size)
- {
- if (_current == null) {
- throw new IllegalStateException("no current delivery");
- }
-
- int consumed = _current.recv(bytes, offset, size);
- if (consumed > 0) {
- getSession().incrementIncomingBytes(-consumed);
- if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
- modified();
- }
- }
- return consumed;
- }
-
- @Override
- public int recv(final WritableBuffer buffer)
- {
- if (_current == null) {
- throw new IllegalStateException("no current delivery");
- }
-
- int consumed = _current.recv(buffer);
- if (consumed > 0) {
- getSession().incrementIncomingBytes(-consumed);
- if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
- modified();
- }
- }
- return consumed;
- }
-
- @Override
- void doFree()
- {
- getSession().freeReceiver(this);
- super.doFree();
- }
-
- boolean hasIncoming()
- {
- return false; //TODO - Implement
- }
-
- void setTransportLink(TransportReceiver transportReceiver)
- {
- _transportReceiver = transportReceiver;
- }
-
- @Override
- TransportReceiver getTransportLink()
- {
- return _transportReceiver;
- }
-
- @Override
- public void drain(int credit)
- {
- setDrain(true);
- flow(credit);
- _drainFlagMode = false;
- }
-
- @Override
- public boolean draining()
- {
- return getDrain() && (getCredit() > getQueued());
- }
-
- @Override
- public void setDrain(boolean drain)
- {
- super.setDrain(drain);
- modified();
- _drainFlagMode = true;
- }
-
- @Override
- public int getRemoteCredit()
- {
- // Credit is only decremented once advance is called on a received message,
- // so we also need to consider the queued count.
- return getCredit() - getQueued();
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
deleted file mode 100644
index 85408cb..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.Record;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * RecordImpl
- *
- */
-
-public class RecordImpl implements Record
-{
-
- private Map<Object,Object> values = new HashMap<Object,Object>();
-
- public <T> void set(Object key, Class<T> klass, T value) {
- values.put(key, value);
- }
-
- public <T> T get(Object key, Class<T> klass) {
- return klass.cast(values.get(key));
- }
-
- public void clear() {
- values.clear();
- }
-
- void copy(RecordImpl src) {
- values.putAll(src.values);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
deleted file mode 100644
index 01e3a35..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-
-/**
- * Ref
- *
- */
-
-class Ref<T>
-{
-
- T value;
-
- public Ref(T initial) {
- value = initial;
- }
-
- public T get() {
- return value;
- }
-
- public void set(T value) {
- this.value = value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
deleted file mode 100644
index efc12dd..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-
-/**
- * Used by {@link SaslFrameParser} to handle the frames it parses
- */
-interface SaslFrameHandler
-{
- void handle(SaslFrameBody frameBody, Binary payload);
-
- boolean isDone();
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
deleted file mode 100644
index 8becc72..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-import org.apache.qpid.proton.codec.ByteBufferDecoder;
-import org.apache.qpid.proton.codec.DecodeException;
-import org.apache.qpid.proton.engine.TransportException;
-
-class SaslFrameParser
-{
- private SaslFrameHandler _sasl;
-
- enum State
- {
- SIZE_0,
- SIZE_1,
- SIZE_2,
- SIZE_3,
- PRE_PARSE,
- BUFFERING,
- PARSING,
- ERROR
- }
-
- private State _state = State.SIZE_0;
- private int _size;
-
- private ByteBuffer _buffer;
-
- private int _ignore = 8;
- private final ByteBufferDecoder _decoder;
-
-
- SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder)
- {
- _sasl = sasl;
- _decoder = decoder;
- }
-
- /**
- * Parse the provided SASL input and call my SASL frame handler with the result
- */
- public void input(ByteBuffer input) throws TransportException
- {
- TransportException frameParsingError = null;
- int size = _size;
- State state = _state;
- ByteBuffer oldIn = null;
-
- // Note that we simply skip over the header rather than parsing it.
- if(_ignore != 0)
- {
- int bytesToEat = Math.min(_ignore, input.remaining());
- input.position(input.position() + bytesToEat);
- _ignore -= bytesToEat;
- }
-
- while(input.hasRemaining() && state != State.ERROR && !_sasl.isDone())
- {
- switch(state)
- {
- case SIZE_0:
- if(input.remaining() >= 4)
- {
- size = input.getInt();
- state = State.PRE_PARSE;
- break;
- }
- else
- {
- size = (input.get() << 24) & 0xFF000000;
- if(!input.hasRemaining())
- {
- state = State.SIZE_1;
- break;
- }
- }
- case SIZE_1:
- size |= (input.get() << 16) & 0xFF0000;
- if(!input.hasRemaining())
- {
- state = State.SIZE_2;
- break;
- }
- case SIZE_2:
- size |= (input.get() << 8) & 0xFF00;
- if(!input.hasRemaining())
- {
- state = State.SIZE_3;
- break;
- }
- case SIZE_3:
- size |= input.get() & 0xFF;
- state = State.PRE_PARSE;
-
- case PRE_PARSE:
- if(size < 8)
- {
- frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header "
- + "size %d",
- _size, 8);
- state = State.ERROR;
- break;
- }
-
- if(input.remaining() < size-4)
- {
- _buffer = ByteBuffer.allocate(size-4);
- _buffer.put(input);
- state = State.BUFFERING;
- break;
- }
- case BUFFERING:
- if(_buffer != null)
- {
- if(input.remaining() < _buffer.remaining())
- {
- _buffer.put(input);
- break;
- }
- else
- {
- ByteBuffer dup = input.duplicate();
- dup.limit(dup.position()+_buffer.remaining());
- input.position(input.position()+_buffer.remaining());
- _buffer.put(dup);
- oldIn = input;
- _buffer.flip();
- input = _buffer;
- state = State.PARSING;
- }
- }
-
- case PARSING:
-
- int dataOffset = (input.get() << 2) & 0x3FF;
-
- if(dataOffset < 8)
- {
- frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
- state = State.ERROR;
- break;
- }
- else if(dataOffset > size)
- {
- frameParsingError = new TransportException("specified frame data offset %d larger than the frame size %d", dataOffset, _size);
- state = State.ERROR;
- break;
- }
-
- // type
-
- int type = input.get() & 0xFF;
- // SASL frame has no type-specific content in the frame header, so we skip next two bytes
- input.get();
- input.get();
-
- if(type != SaslImpl.SASL_FRAME_TYPE)
- {
- frameParsingError = new TransportException("unknown frame type: %d", type);
- state = State.ERROR;
- break;
- }
-
- if(dataOffset!=8)
- {
- input.position(input.position()+dataOffset-8);
- }
-
- // oldIn null iff not working on duplicated buffer
- if(oldIn == null)
- {
- oldIn = input;
- input = input.duplicate();
- final int endPos = input.position() + size - dataOffset;
- input.limit(endPos);
- oldIn.position(endPos);
-
- }
-
- try
- {
- _decoder.setByteBuffer(input);
- Object val = _decoder.readObject();
-
- Binary payload;
-
- if(input.hasRemaining())
- {
- byte[] payloadBytes = new byte[input.remaining()];
- input.get(payloadBytes);
- payload = new Binary(payloadBytes);
- }
- else
- {
- payload = null;
- }
-
- if(val instanceof SaslFrameBody)
- {
- SaslFrameBody frameBody = (SaslFrameBody) val;
- _sasl.handle(frameBody, payload);
-
- reset();
- input = oldIn;
- oldIn = null;
- _buffer = null;
- state = State.SIZE_0;
- }
- else
- {
- state = State.ERROR;
- frameParsingError = new TransportException("Unexpected frame type encountered."
- + " Found a %s which does not implement %s",
- val == null ? "null" : val.getClass(), SaslFrameBody.class);
- }
- }
- catch (DecodeException ex)
- {
- state = State.ERROR;
- frameParsingError = new TransportException(ex);
- }
- break;
- case ERROR:
- // do nothing
- }
-
- }
-
- _state = state;
- _size = size;
-
- if(_state == State.ERROR)
- {
- if(frameParsingError != null)
- {
- throw frameParsingError;
- }
- else
- {
- throw new TransportException("Unable to parse, probably because of a previous error");
- }
- }
- }
-
- private void reset()
- {
- _size = 0;
- _state = State.SIZE_0;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
deleted file mode 100644
index 56567fe..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ /dev/null
@@ -1,738 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-package org.apache.qpid.proton.engine.impl;
-
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourAll;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.security.SaslChallenge;
-import org.apache.qpid.proton.amqp.security.SaslCode;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-import org.apache.qpid.proton.amqp.security.SaslInit;
-import org.apache.qpid.proton.amqp.security.SaslMechanisms;
-import org.apache.qpid.proton.amqp.security.SaslResponse;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-
-public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, SaslFrameHandler, TransportLayer
-{
- private static final Logger _logger = Logger.getLogger(SaslImpl.class.getName());
-
- public static final byte SASL_FRAME_TYPE = (byte) 1;
-
- private final DecoderImpl _decoder = new DecoderImpl();
- private final EncoderImpl _encoder = new EncoderImpl(_decoder);
-
- private final TransportImpl _transport;
-
- private boolean _tail_closed = false;
- private final ByteBuffer _inputBuffer;
- private boolean _head_closed = false;
- private final ByteBuffer _outputBuffer;
- private final FrameWriter _frameWriter;
-
- private ByteBuffer _pending;
-
- private boolean _headerWritten;
- private Binary _challengeResponse;
- private SaslFrameParser _frameParser;
- private boolean _initReceived;
- private boolean _mechanismsSent;
- private boolean _initSent;
-
- enum Role { CLIENT, SERVER };
-
- private SaslOutcome _outcome = SaslOutcome.PN_SASL_NONE;
- private SaslState _state = SaslState.PN_SASL_IDLE;
-
- private String _hostname;
- private boolean _done;
- private Symbol[] _mechanisms;
-
- private Symbol _chosenMechanism;
-
- private Role _role;
- private boolean _allowSkip = true;
-
- /**
- * @param maxFrameSize the size of the input and output buffers
- * returned by {@link SaslTransportWrapper#getInputBuffer()} and
- * {@link SaslTransportWrapper#getOutputBuffer()}.
- */
- SaslImpl(TransportImpl transport, int maxFrameSize)
- {
- _transport = transport;
- _inputBuffer = newWriteableBuffer(maxFrameSize);
- _outputBuffer = newWriteableBuffer(maxFrameSize);
-
- AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
- _frameParser = new SaslFrameParser(this, _decoder);
- _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
- }
-
- void fail() {
- if (_role == null || _role == Role.CLIENT) {
- _role = Role.CLIENT;
- _initSent = true;
- } else {
- _initReceived = true;
-
- }
- _done = true;
- _outcome = SaslOutcome.PN_SASL_SYS;
- }
-
- @Override
- public boolean isDone()
- {
- return _done && (_role==Role.CLIENT || _initReceived);
- }
-
- private void writeSaslOutput()
- {
- process();
- _frameWriter.readBytes(_outputBuffer);
-
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
- }
- }
-
- private void process()
- {
- processHeader();
-
- if(_role == Role.SERVER)
- {
- if(!_mechanismsSent && _mechanisms != null)
- {
- SaslMechanisms mechanisms = new SaslMechanisms();
-
- mechanisms.setSaslServerMechanisms(_mechanisms);
- writeFrame(mechanisms);
- _mechanismsSent = true;
- _state = SaslState.PN_SASL_STEP;
- }
-
- if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() != null)
- {
- SaslChallenge challenge = new SaslChallenge();
- challenge.setChallenge(getChallengeResponse());
- writeFrame(challenge);
- setChallengeResponse(null);
- }
-
- if(_done)
- {
- org.apache.qpid.proton.amqp.security.SaslOutcome outcome =
- new org.apache.qpid.proton.amqp.security.SaslOutcome();
- outcome.setCode(SaslCode.values()[_outcome.getCode()]);
- writeFrame(outcome);
- }
- }
- else if(_role == Role.CLIENT)
- {
- if(getState() == SaslState.PN_SASL_IDLE && _chosenMechanism != null)
- {
- processInit();
- _state = SaslState.PN_SASL_STEP;
-
- //HACK: if we received an outcome before
- //we sent our init, change the state now
- if(_outcome != SaslOutcome.PN_SASL_NONE)
- {
- _state = classifyStateFromOutcome(_outcome);
- }
- }
-
- if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() != null)
- {
- processResponse();
- }
- }
- }
-
- private void writeFrame(SaslFrameBody frameBody)
- {
- _frameWriter.writeFrame(frameBody);
- }
-
- @Override
- final public int recv(byte[] bytes, int offset, int size)
- {
- if(_pending == null)
- {
- return -1;
- }
- final int written = pourBufferToArray(_pending, bytes, offset, size);
- if(!_pending.hasRemaining())
- {
- _pending = null;
- }
- return written;
- }
-
- @Override
- final public int send(byte[] bytes, int offset, int size)
- {
- byte[] data = new byte[size];
- System.arraycopy(bytes, offset, data, 0, size);
- setChallengeResponse(new Binary(data));
- return size;
- }
-
- final int processHeader()
- {
- if(!_headerWritten)
- {
- _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
-
- _headerWritten = true;
- return AmqpHeader.SASL_HEADER.length;
- }
- else
- {
- return 0;
- }
- }
-
- @Override
- public int pending()
- {
- return _pending == null ? 0 : _pending.remaining();
- }
-
- void setPending(ByteBuffer pending)
- {
- _pending = pending;
- }
-
- @Override
- public SaslState getState()
- {
- return _state;
- }
-
- final Binary getChallengeResponse()
- {
- return _challengeResponse;
- }
-
- final void setChallengeResponse(Binary challengeResponse)
- {
- _challengeResponse = challengeResponse;
- }
-
- @Override
- public void setMechanisms(String... mechanisms)
- {
- if(mechanisms != null)
- {
- _mechanisms = new Symbol[mechanisms.length];
- for(int i = 0; i < mechanisms.length; i++)
- {
- _mechanisms[i] = Symbol.valueOf(mechanisms[i]);
- }
- }
-
- if(_role == Role.CLIENT)
- {
- assert mechanisms != null;
- assert mechanisms.length == 1;
-
- _chosenMechanism = Symbol.valueOf(mechanisms[0]);
- }
- }
-
- @Override
- public String[] getRemoteMechanisms()
- {
- if(_role == Role.SERVER)
- {
- return _chosenMechanism == null ? new String[0] : new String[] { _chosenMechanism.toString() };
- }
- else if(_role == Role.CLIENT)
- {
- if(_mechanisms == null)
- {
- return new String[0];
- }
- else
- {
- String[] remoteMechanisms = new String[_mechanisms.length];
- for(int i = 0; i < _mechanisms.length; i++)
- {
- remoteMechanisms[i] = _mechanisms[i].toString();
- }
- return remoteMechanisms;
- }
- }
- else
- {
- throw new IllegalStateException();
- }
- }
-
- public void setMechanism(Symbol mechanism)
- {
- _chosenMechanism = mechanism;
- }
-
- public Symbol getChosenMechanism()
- {
- return _chosenMechanism;
- }
-
- public void setResponse(Binary initialResponse)
- {
- setPending(initialResponse.asByteBuffer());
- }
-
- @Override
- public void handle(SaslFrameBody frameBody, Binary payload)
- {
- frameBody.invoke(this, payload, null);
- }
-
- @Override
- public void handleInit(SaslInit saslInit, Binary payload, Void context)
- {
- if(_role == null)
- {
- server();
- }
- checkRole(Role.SERVER);
- _hostname = saslInit.getHostname();
- _chosenMechanism = saslInit.getMechanism();
- _initReceived = true;
- if(saslInit.getInitialResponse() != null)
- {
- setPending(saslInit.getInitialResponse().asByteBuffer());
- }
- }
-
- @Override
- public void handleResponse(SaslResponse saslResponse, Binary payload, Void context)
- {
- checkRole(Role.SERVER);
- setPending(saslResponse.getResponse() == null ? null : saslResponse.getResponse().asByteBuffer());
- }
-
- @Override
- public void done(SaslOutcome outcome)
- {
- checkRole(Role.SERVER);
- _outcome = outcome;
- _done = true;
- _state = classifyStateFromOutcome(outcome);
- _logger.fine("SASL negotiation done: " + this);
- }
-
- private void checkRole(Role role)
- {
- if(role != _role)
- {
- throw new IllegalStateException("Role is " + _role + " but should be " + role);
- }
- }
-
- @Override
- public void handleMechanisms(SaslMechanisms saslMechanisms, Binary payload, Void context)
- {
- if(_role == null)
- {
- client();
- }
- checkRole(Role.CLIENT);
- _mechanisms = saslMechanisms.getSaslServerMechanisms();
- }
-
- @Override
- public void handleChallenge(SaslChallenge saslChallenge, Binary payload, Void context)
- {
- checkRole(Role.CLIENT);
- setPending(saslChallenge.getChallenge() == null ? null : saslChallenge.getChallenge().asByteBuffer());
- }
-
- @Override
- public void handleOutcome(org.apache.qpid.proton.amqp.security.SaslOutcome saslOutcome,
- Binary payload,
- Void context)
- {
- checkRole(Role.CLIENT);
- for(SaslOutcome outcome : SaslOutcome.values())
- {
- if(outcome.getCode() == saslOutcome.getCode().ordinal())
- {
- _outcome = outcome;
- if (_state != SaslState.PN_SASL_IDLE)
- {
- _state = classifyStateFromOutcome(outcome);
- }
- break;
- }
- }
- _done = true;
-
- if(_logger.isLoggable(Level.FINE))
- {
- _logger.fine("Handled outcome: " + this);
- }
- }
-
- private SaslState classifyStateFromOutcome(SaslOutcome outcome)
- {
- return outcome == SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS : SaslState.PN_SASL_FAIL;
- }
-
- private void processResponse()
- {
- SaslResponse response = new SaslResponse();
- response.setResponse(getChallengeResponse());
- setChallengeResponse(null);
- writeFrame(response);
- }
-
- private void processInit()
- {
- SaslInit init = new SaslInit();
- init.setHostname(_hostname);
- init.setMechanism(_chosenMechanism);
- if(getChallengeResponse() != null)
- {
- init.setInitialResponse(getChallengeResponse());
- setChallengeResponse(null);
- }
- _initSent = true;
- writeFrame(init);
- }
-
- @Override
- public void plain(String username, String password)
- {
- client();
- _chosenMechanism = Symbol.valueOf("PLAIN");
- byte[] usernameBytes = username.getBytes();
- byte[] passwordBytes = password.getBytes();
- byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
- System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
- System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length);
-
- setChallengeResponse(new Binary(data));
- }
-
- @Override
- public SaslOutcome getOutcome()
- {
- return _outcome;
- }
-
- @Override
- public void client()
- {
- _role = Role.CLIENT;
- if(_mechanisms != null)
- {
- assert _mechanisms.length == 1;
-
- _chosenMechanism = _mechanisms[0];
- }
- }
-
- @Override
- public void server()
- {
- _role = Role.SERVER;
- }
-
- @Override
- public void allowSkip(boolean allowSkip)
- {
- _allowSkip = allowSkip;
- }
-
- public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
- {
- return new SaslSniffer(new SaslTransportWrapper(input, output),
- new PlainTransportWrapper(output, input)) {
- protected boolean isDeterminationMade() {
- if (_role == Role.SERVER && _allowSkip) {
- return super.isDeterminationMade();
- } else {
- _selectedTransportWrapper = _wrapper1;
- return true;
- }
- }
- };
- }
-
- @Override
- public String toString()
- {
- StringBuilder builder = new StringBuilder();
- builder
- .append("SaslImpl [_outcome=").append(_outcome)
- .append(", state=").append(_state)
- .append(", done=").append(_done)
- .append(", role=").append(_role)
- .append("]");
- return builder.toString();
- }
-
- private class SaslTransportWrapper implements TransportWrapper
- {
- private final TransportInput _underlyingInput;
- private final TransportOutput _underlyingOutput;
- private boolean _outputComplete;
- private final ByteBuffer _head;
-
- private SaslTransportWrapper(TransportInput input, TransportOutput output)
- {
- _underlyingInput = input;
- _underlyingOutput = output;
- _head = _outputBuffer.asReadOnlyBuffer();
- _head.limit(0);
- }
-
- private void fillOutputBuffer()
- {
- if(isOutputInSaslMode())
- {
- SaslImpl.this.writeSaslOutput();
- if(_done)
- {
- _outputComplete = true;
- }
- }
- }
-
- /**
- * TODO rationalise this method with respect to the other similar checks of _role/_initReceived etc
- * @see SaslImpl#isDone()
- */
- private boolean isInputInSaslMode()
- {
- return _role == null || (_role == Role.CLIENT && !_done) ||(_role == Role.SERVER && (!_initReceived || !_done));
- }
-
- private boolean isOutputInSaslMode()
- {
- return _role == null || (_role == Role.CLIENT && (!_done || !_initSent)) || (_role == Role.SERVER && !_outputComplete);
- }
-
- @Override
- public int capacity()
- {
- if (_tail_closed) return Transport.END_OF_STREAM;
- if (isInputInSaslMode())
- {
- return _inputBuffer.remaining();
- }
- else
- {
- return _underlyingInput.capacity();
- }
- }
-
- @Override
- public int position()
- {
- if (_tail_closed) return Transport.END_OF_STREAM;
- if (isInputInSaslMode())
- {
- return _inputBuffer.position();
- }
- else
- {
- return _underlyingInput.position();
- }
- }
-
- @Override
- public ByteBuffer tail()
- {
- if (!isInputInSaslMode())
- {
- return _underlyingInput.tail();
- }
-
- return _inputBuffer;
- }
-
- @Override
- public void process() throws TransportException
- {
- _inputBuffer.flip();
-
- try
- {
- reallyProcessInput();
- }
- finally
- {
- _inputBuffer.compact();
- }
- }
-
- @Override
- public void close_tail()
- {
- _tail_closed = true;
- if (isInputInSaslMode()) {
- _head_closed = true;
- _underlyingInput.close_tail();
- } else {
- _underlyingInput.close_tail();
- }
- }
-
- private void reallyProcessInput() throws TransportException
- {
- if(isInputInSaslMode())
- {
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER, SaslImpl.this + " about to call input.");
- }
-
- _frameParser.input(_inputBuffer);
- }
-
- if(!isInputInSaslMode())
- {
- if(_logger.isLoggable(Level.FINER))
- {
- _logger.log(Level.FINER, SaslImpl.this + " about to call plain input");
- }
-
- if (_inputBuffer.hasRemaining())
- {
- int bytes = pourAll(_inputBuffer, _underlyingInput);
- if (bytes == Transport.END_OF_STREAM)
- {
- _tail_closed = true;
- }
-
- _underlyingInput.process();
- }
- else
- {
- _underlyingInput.process();
- }
- }
- }
-
- @Override
- public int pending()
- {
- if (isOutputInSaslMode() || _outputBuffer.position() != 0)
- {
- fillOutputBuffer();
- _head.limit(_outputBuffer.position());
-
- if (_head_closed && _outputBuffer.position() == 0)
- {
- return Transport.END_OF_STREAM;
- }
- else
- {
- return _outputBuffer.position();
- }
- }
- else
- {
- return _underlyingOutput.pending();
- }
- }
-
- @Override
- public ByteBuffer head()
- {
- if (isOutputInSaslMode() || _outputBuffer.position() != 0)
- {
- pending();
- return _head;
- }
- else
- {
- return _underlyingOutput.head();
- }
- }
-
- @Override
- public void pop(int bytes)
- {
- if (isOutputInSaslMode() || _outputBuffer.position() != 0)
- {
- _outputBuffer.flip();
- _outputBuffer.position(bytes);
- _outputBuffer.compact();
- _head.position(0);
- _head.limit(_outputBuffer.position());
- }
- else
- {
- _underlyingOutput.pop(bytes);
- }
- }
-
- @Override
- public void close_head()
- {
- _underlyingOutput.close_head();
- }
- }
-
- @Override
- public String getHostname()
- {
- if(_role != null)
- {
- checkRole(Role.SERVER);
- }
-
- return _hostname;
- }
-
- @Override
- public void setRemoteHostname(String hostname)
- {
- if(_role != null)
- {
- checkRole(Role.CLIENT);
- }
-
- _hostname = hostname;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
deleted file mode 100644
index 2d92496..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-
-/**
- * SaslSniffer
- *
- */
-
-class SaslSniffer extends HandshakeSniffingTransportWrapper<TransportWrapper, TransportWrapper>
-{
-
- SaslSniffer(TransportWrapper sasl, TransportWrapper other) {
- super(sasl, other);
- }
-
- protected int bufferSize() { return AmqpHeader.SASL_HEADER.length; }
-
- protected void makeDetermination(byte[] bytes) {
- if (bytes.length < bufferSize()) {
- throw new IllegalArgumentException("insufficient bytes");
- }
-
- for (int i = 0; i < AmqpHeader.SASL_HEADER.length; i++) {
- if (bytes[i] != AmqpHeader.SASL_HEADER[i]) {
- _selectedTransportWrapper = _wrapper2;
- return;
- }
- }
-
- _selectedTransportWrapper = _wrapper1;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
deleted file mode 100644
index f418655..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Sender;
-
-public class SenderImpl extends LinkImpl implements Sender
-{
- private int _offered;
- private TransportSender _transportLink;
-
- SenderImpl(SessionImpl session, String name)
- {
- super(session, name);
- }
-
- @Override
- public void offer(final int credits)
- {
- _offered = credits;
- }
-
- @Override
- public int send(final byte[] bytes, int offset, int length)
- {
- if (getLocalState() == EndpointState.CLOSED)
- {
- throw new IllegalStateException("send not allowed after the sender is closed.");
- }
- DeliveryImpl current = current();
- if (current == null || current.getLink() != this)
- {
- throw new IllegalArgumentException();//TODO.
- }
- int sent = current.send(bytes, offset, length);
- if (sent > 0) {
- getSession().incrementOutgoingBytes(sent);
- }
- return sent;
- }
-
- @Override
- public int send(final ReadableBuffer buffer)
- {
- if (getLocalState() == EndpointState.CLOSED)
- {
- throw new IllegalStateException("send not allowed after the sender is closed.");
- }
- DeliveryImpl current = current();
- if (current == null || current.getLink() != this)
- {
- throw new IllegalArgumentException();
- }
- int sent = current.send(buffer);
- if (sent > 0) {
- getSession().incrementOutgoingBytes(sent);
- }
- return sent;
- }
-
- @Override
- public void abort()
- {
- //TODO.
- }
-
- @Override
- void doFree()
- {
- getSession().freeSender(this);
- super.doFree();
- }
-
- @Override
- public boolean advance()
- {
- DeliveryImpl delivery = current();
- if (delivery != null) {
- delivery.setComplete();
- }
-
- boolean advance = super.advance();
- if(advance && _offered > 0)
- {
- _offered--;
- }
- if(advance)
- {
- decrementCredit();
- delivery.addToTransportWorkList();
- getSession().incrementOutgoingDeliveries(1);
- }
-
- return advance;
- }
-
- boolean hasOfferedCredits()
- {
- return _offered > 0;
- }
-
- @Override
- TransportSender getTransportLink()
- {
- return _transportLink;
- }
-
- void setTransportLink(TransportSender transportLink)
- {
- _transportLink = transportLink;
- }
-
-
- @Override
- public void setCredit(int credit)
- {
- super.setCredit(credit);
- /* while(getQueued()>0 && getCredit()>0)
- {
- advance();
- }*/
- }
-
- @Override
- public int getRemoteCredit()
- {
- // Credit is decremented as soon as advance is called on a send,
- // so we need only consider the credit count, not the queued count.
- return getCredit();
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org