You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/09 15:24:51 UTC

[15/34] qpid-proton git commit: PROTON-1385: remove proton-j from the existing repo, it now has its own repo at: https://git-wip-us.apache.org/repos/asf/qpid-proton-j.git

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
deleted file mode 100644
index fb1b06a..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.transport.EmptyFrame;
-import org.apache.qpid.proton.amqp.transport.FrameBody;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.framing.TransportFrame;
-
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-
-/**
- * FrameWriter
- *
- */
-
-class FrameWriter
-{
-
-    static final byte AMQP_FRAME_TYPE = 0;
-    static final byte SASL_FRAME_TYPE = (byte) 1;
-
-    private EncoderImpl _encoder;
-    private ByteBuffer _bbuf;
-    private WritableBuffer _buffer;
-    private int _maxFrameSize;
-    private byte _frameType;
-    final private Ref<ProtocolTracer> _protocolTracer;
-    private TransportImpl _transport;
-
-    private int _frameStart = 0;
-    private int _payloadStart;
-    private int _performativeSize;
-    private long _framesOutput = 0;
-
-    FrameWriter(EncoderImpl encoder, int maxFrameSize, byte frameType,
-                Ref<ProtocolTracer> protocolTracer, TransportImpl transport)
-    {
-        _encoder = encoder;
-        _bbuf = ByteBuffer.allocate(1024);
-        _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
-        _encoder.setByteBuffer(_buffer);
-        _maxFrameSize = maxFrameSize;
-        _frameType = frameType;
-        _protocolTracer = protocolTracer;
-        _transport = transport;
-    }
-
-    void setMaxFrameSize(int maxFrameSize)
-    {
-        _maxFrameSize = maxFrameSize;
-    }
-
-    private void grow()
-    {
-        ByteBuffer old = _bbuf;
-        _bbuf = ByteBuffer.allocate(_bbuf.capacity() * 2);
-        _buffer = new WritableBuffer.ByteBufferWrapper(_bbuf);
-        old.flip();
-        _bbuf.put(old);
-        _encoder.setByteBuffer(_buffer);
-    }
-
-    void writeHeader(byte[] header)
-    {
-        _buffer.put(header, 0, header.length);
-    }
-
-    private void startFrame()
-    {
-        _frameStart = _buffer.position();
-    }
-
-    private void writePerformative(Object frameBody)
-    {
-        while (_buffer.remaining() < 8) {
-            grow();
-        }
-
-        while (true)
-        {
-            try
-            {
-                _buffer.position(_frameStart + 8);
-                if (frameBody != null) _encoder.writeObject(frameBody);
-                break;
-            }
-            catch (BufferOverflowException e)
-            {
-                grow();
-            }
-        }
-
-        _payloadStart = _buffer.position();
-        _performativeSize = _payloadStart - _frameStart;
-    }
-
-    private void endFrame(int channel)
-    {
-        int frameSize = _buffer.position() - _frameStart;
-        int limit = _buffer.position();
-        _buffer.position(_frameStart);
-        _buffer.putInt(frameSize);
-        _buffer.put((byte) 2);
-        _buffer.put(_frameType);
-        _buffer.putShort((short) channel);
-        _buffer.position(limit);
-    }
-
-    void writeFrame(int channel, Object frameBody, ByteBuffer payload,
-                    Runnable onPayloadTooLarge)
-    {
-        startFrame();
-
-        writePerformative(frameBody);
-
-        if(_maxFrameSize > 0 && payload != null && (payload.remaining() + _performativeSize) > _maxFrameSize)
-        {
-            if(onPayloadTooLarge != null)
-            {
-                onPayloadTooLarge.run();
-            }
-            writePerformative(frameBody);
-        }
-
-        int capacity;
-        if (_maxFrameSize > 0) {
-            capacity = _maxFrameSize - _performativeSize;
-        } else {
-            capacity = Integer.MAX_VALUE;
-        }
-        int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
-
-        ProtocolTracer tracer = _protocolTracer == null ? null : _protocolTracer.get();
-        if( tracer != null || _transport.isTraceFramesEnabled())
-        {
-            // XXX: this is a bit of a hack but it eliminates duplicate
-            // code, further refactor will fix this
-            if (_frameType == AMQP_FRAME_TYPE)
-            {
-                ByteBuffer originalPayload = null;
-                if( payload!=null )
-                {
-                    originalPayload = payload.duplicate();
-                    originalPayload.limit(payload.position() + payloadSize);
-                }
-
-                Binary payloadBin = Binary.create(originalPayload);
-                FrameBody body = null;
-                if (frameBody == null)
-                {
-                    body = new EmptyFrame();
-                }
-                else
-                {
-                    body = (FrameBody) frameBody;
-                }
-
-                TransportFrame frame = new TransportFrame(channel, body, payloadBin);
-
-                _transport.log(TransportImpl.OUTGOING, frame);
-
-                if(tracer != null)
-                {
-                    tracer.sentFrame(frame);
-                }
-            }
-        }
-
-        if(payloadSize > 0)
-        {
-            while (_buffer.remaining() < payloadSize) {
-                grow();
-            }
-
-            int oldLimit = payload.limit();
-            payload.limit(payload.position() + payloadSize);
-            _buffer.put(payload);
-            payload.limit(oldLimit);
-        }
-
-        endFrame(channel);
-
-        _framesOutput += 1;
-    }
-
-    void writeFrame(Object frameBody)
-    {
-        writeFrame(0, frameBody, null, null);
-    }
-
-    boolean isFull() {
-        // XXX: this should probably be tunable
-        return _bbuf.position() > 64*1024;
-    }
-
-    int readBytes(ByteBuffer dst)
-    {
-        ByteBuffer src = _bbuf.duplicate();
-        src.flip();
-
-        int size = Math.min(src.remaining(), dst.remaining());
-        int limit = src.limit();
-        src.limit(size);
-        dst.put(src);
-        src.limit(limit);
-        _bbuf.rewind();
-        _bbuf.put(src);
-
-        return size;
-    }
-
-    long getFramesOutput()
-    {
-        return _framesOutput;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
deleted file mode 100644
index 6a5aac5..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/HandshakeSniffingTransportWrapper.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-import org.apache.qpid.proton.engine.impl.TransportWrapper;
-
-public abstract class HandshakeSniffingTransportWrapper<T1 extends TransportWrapper, T2 extends TransportWrapper>
-    implements TransportWrapper
-{
-
-    protected final T1 _wrapper1;
-    protected final T2 _wrapper2;
-
-    private boolean _tail_closed = false;
-    private boolean _head_closed = false;
-    protected TransportWrapper _selectedTransportWrapper;
-
-    private final ByteBuffer _determinationBuffer;
-
-    protected HandshakeSniffingTransportWrapper
-        (T1 wrapper1,
-         T2 wrapper2)
-    {
-        _wrapper1 = wrapper1;
-        _wrapper2 = wrapper2;
-        _determinationBuffer = ByteBuffer.allocate(bufferSize());
-    }
-
-    @Override
-    public int capacity()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.capacity();
-        }
-        else
-        {
-            if (_tail_closed) { return Transport.END_OF_STREAM; }
-            return _determinationBuffer.remaining();
-        }
-    }
-
-    @Override
-    public int position()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.position();
-        }
-        else
-        {
-            if (_tail_closed) { return Transport.END_OF_STREAM; }
-            return _determinationBuffer.position();
-        }
-    }
-
-    @Override
-    public ByteBuffer tail()
-    {
-        if (isDeterminationMade())
-        {
-            return _selectedTransportWrapper.tail();
-        }
-        else
-        {
-            return _determinationBuffer;
-        }
-    }
-
-    protected abstract int bufferSize();
-
-    protected abstract void makeDetermination(byte[] bytes);
-
-    @Override
-    public void process() throws TransportException
-    {
-        if (isDeterminationMade())
-        {
-            _selectedTransportWrapper.process();
-        }
-        else if (_determinationBuffer.remaining() == 0)
-        {
-            _determinationBuffer.flip();
-            byte[] bytesInput = new byte[_determinationBuffer.remaining()];
-            _determinationBuffer.get(bytesInput);
-            makeDetermination(bytesInput);
-            _determinationBuffer.rewind();
-
-            // TODO what if the selected transport has insufficient capacity?? Maybe use pour, and then try to finish pouring next time round.
-            _selectedTransportWrapper.tail().put(_determinationBuffer);
-            _selectedTransportWrapper.process();
-        } else if (_tail_closed) {
-            throw new TransportException("connection aborted");
-        }
-    }
-
-    @Override
-    public void close_tail()
-    {
-        try {
-            if (isDeterminationMade())
-            {
-                _selectedTransportWrapper.close_tail();
-            }
-        } finally {
-            _tail_closed = true;
-        }
-    }
-
-    @Override
-    public int pending()
-    {
-        if (_head_closed) { return Transport.END_OF_STREAM; }
-
-        if (isDeterminationMade()) {
-            return _selectedTransportWrapper.pending();
-        } else {
-            return 0;
-        }
-
-    }
-
-    private static final ByteBuffer EMPTY = ByteBuffer.allocate(0);
-
-    @Override
-    public ByteBuffer head()
-    {
-        if (isDeterminationMade()) {
-            return _selectedTransportWrapper.head();
-        } else {
-            return EMPTY;
-        }
-    }
-
-    @Override
-    public void pop(int bytes)
-    {
-        if (isDeterminationMade()) {
-            _selectedTransportWrapper.pop(bytes);
-        } else if (bytes > 0) {
-            throw new IllegalStateException("no bytes have been read");
-        }
-    }
-
-    @Override
-    public void close_head()
-    {
-        if (isDeterminationMade()) {
-            _selectedTransportWrapper.close_head();
-        } else {
-            _head_closed = true;
-        }
-    }
-
-    protected boolean isDeterminationMade()
-    {
-        return _selectedTransportWrapper != null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
deleted file mode 100644
index a67785e..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
-import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
-import org.apache.qpid.proton.amqp.transport.Source;
-import org.apache.qpid.proton.amqp.transport.Target;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Event;
-import org.apache.qpid.proton.engine.Link;
-
-public abstract class LinkImpl extends EndpointImpl implements Link
-{
-
-    private final SessionImpl _session;
-
-    DeliveryImpl _head;
-    DeliveryImpl _tail;
-    DeliveryImpl _current;
-    private String _name;
-    private Source _source;
-    private Source _remoteSource;
-    private Target _target;
-    private Target _remoteTarget;
-    private int _queued;
-    private int _credit;
-    private int _unsettled;
-    private int _drained;
-
-    private SenderSettleMode _senderSettleMode;
-    private SenderSettleMode _remoteSenderSettleMode;
-    private ReceiverSettleMode _receiverSettleMode;
-    private ReceiverSettleMode _remoteReceiverSettleMode;
-
-
-    private LinkNode<LinkImpl> _node;
-    private boolean _drain;
-    private boolean _detached;
-    private Map<Symbol, Object> _properties;
-    private Map<Symbol, Object> _remoteProperties;
-    private Symbol[] _offeredCapabilities;
-    private Symbol[] _remoteOfferedCapabilities;
-    private Symbol[] _desiredCapabilities;
-    private Symbol[] _remoteDesiredCapabilities;
-
-    LinkImpl(SessionImpl session, String name)
-    {
-        _session = session;
-        _session.incref();
-        _name = name;
-        ConnectionImpl conn = session.getConnectionImpl();
-        _node = conn.addLinkEndpoint(this);
-        conn.put(Event.Type.LINK_INIT, this);
-    }
-
-
-    @Override
-    public String getName()
-    {
-        return _name;
-    }
-
-    @Override
-    public DeliveryImpl delivery(byte[] tag)
-    {
-        return delivery(tag, 0, tag.length);
-    }
-
-    @Override
-    public DeliveryImpl delivery(byte[] tag, int offset, int length)
-    {
-        if (offset != 0 || length != tag.length)
-        {
-            throw new IllegalArgumentException("At present delivery tag must be the whole byte array");
-        }
-        incrementQueued();
-        try
-        {
-            DeliveryImpl delivery = new DeliveryImpl(tag, this, _tail);
-            if (_tail == null)
-            {
-                _head = delivery;
-            }
-            _tail = delivery;
-            if (_current == null)
-            {
-                _current = delivery;
-            }
-            getConnectionImpl().workUpdate(delivery);
-            return delivery;
-        }
-        catch (RuntimeException e)
-        {
-            e.printStackTrace();
-            throw e;
-        }
-    }
-
-    @Override
-    void postFinal() {
-        _session.getConnectionImpl().put(Event.Type.LINK_FINAL, this);
-        _session.decref();
-    }
-
-    @Override
-    void doFree()
-    {
-        DeliveryImpl dlv = _head;
-        while (dlv != null) {
-            dlv.free();
-            dlv = dlv.next();
-        }
-
-        _session.getConnectionImpl().removeLinkEndpoint(_node);
-        _node = null;
-    }
-
-    void modifyEndpoints() {
-        modified();
-    }
-
-    public void remove(DeliveryImpl delivery)
-    {
-        if(_head == delivery)
-        {
-            _head = delivery.getLinkNext();
-        }
-        if(_tail == delivery)
-        {
-            _tail = delivery.getLinkPrevious();
-        }
-        if(_current == delivery)
-        {
-            // TODO - what???
-        }
-    }
-
-    @Override
-    public DeliveryImpl current()
-    {
-        return _current;
-    }
-
-    @Override
-    public boolean advance()
-    {
-        if(_current != null )
-        {
-            DeliveryImpl oldCurrent = _current;
-            _current = _current.getLinkNext();
-            getConnectionImpl().workUpdate(oldCurrent);
-
-            if(_current != null)
-            {
-                getConnectionImpl().workUpdate(_current);
-            }
-            return true;
-        }
-        else
-        {
-            return false;
-        }
-
-    }
-
-    @Override
-    protected ConnectionImpl getConnectionImpl()
-    {
-        return _session.getConnectionImpl();
-    }
-
-    @Override
-    public SessionImpl getSession()
-    {
-        return _session;
-    }
-
-    @Override
-    public Source getRemoteSource()
-    {
-        return _remoteSource;
-    }
-
-    void setRemoteSource(Source source)
-    {
-        _remoteSource = source;
-    }
-
-    @Override
-    public Target getRemoteTarget()
-    {
-        return _remoteTarget;
-    }
-
-    void setRemoteTarget(Target target)
-    {
-        _remoteTarget = target;
-    }
-
-    @Override
-    public Source getSource()
-    {
-        return _source;
-    }
-
-    @Override
-    public void setSource(Source source)
-    {
-        // TODO - should be an error if local state is ACTIVE
-        _source = source;
-    }
-
-    @Override
-    public Target getTarget()
-    {
-        return _target;
-    }
-
-    @Override
-    public void setTarget(Target target)
-    {
-        // TODO - should be an error if local state is ACTIVE
-        _target = target;
-    }
-
-    @Override
-    public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote)
-    {
-        LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote);
-
-        LinkNode<LinkImpl> linkNode = _node.next(query);
-
-        return linkNode == null ? null : linkNode.getValue();
-
-    }
-
-    abstract TransportLink getTransportLink();
-
-    @Override
-    public int getCredit()
-    {
-        return _credit;
-    }
-
-    public void addCredit(int credit)
-    {
-        _credit+=credit;
-    }
-
-    public void setCredit(int credit)
-    {
-        _credit = credit;
-    }
-
-    boolean hasCredit()
-    {
-        return _credit > 0;
-    }
-
-    void incrementCredit()
-    {
-        _credit++;
-    }
-
-    void decrementCredit()
-    {
-        _credit--;
-    }
-
-    @Override
-    public int getQueued()
-    {
-        return _queued;
-    }
-
-    void incrementQueued()
-    {
-        _queued++;
-    }
-
-    void decrementQueued()
-    {
-        _queued--;
-    }
-
-    @Override
-    public int getUnsettled()
-    {
-        return _unsettled;
-    }
-
-    void incrementUnsettled()
-    {
-        _unsettled++;
-    }
-
-    void decrementUnsettled()
-    {
-        _unsettled--;
-    }
-
-    void setDrain(boolean drain)
-    {
-        _drain = drain;
-    }
-
-    @Override
-    public boolean getDrain()
-    {
-        return _drain;
-    }
-
-    @Override
-    public SenderSettleMode getSenderSettleMode()
-    {
-        return _senderSettleMode;
-    }
-
-    @Override
-    public void setSenderSettleMode(SenderSettleMode senderSettleMode)
-    {
-        _senderSettleMode = senderSettleMode;
-    }
-
-    @Override
-    public SenderSettleMode getRemoteSenderSettleMode()
-    {
-        return _remoteSenderSettleMode;
-    }
-
-    @Override
-    public void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode)
-    {
-        _remoteSenderSettleMode = remoteSenderSettleMode;
-    }
-
-    @Override
-    public ReceiverSettleMode getReceiverSettleMode()
-    {
-        return _receiverSettleMode;
-    }
-
-    @Override
-    public void setReceiverSettleMode(ReceiverSettleMode receiverSettleMode)
-    {
-        _receiverSettleMode = receiverSettleMode;
-    }
-
-    @Override
-    public ReceiverSettleMode getRemoteReceiverSettleMode()
-    {
-        return _remoteReceiverSettleMode;
-    }
-
-    void setRemoteReceiverSettleMode(ReceiverSettleMode remoteReceiverSettleMode)
-    {
-        _remoteReceiverSettleMode = remoteReceiverSettleMode;
-    }
-
-    @Override
-    public Map<Symbol, Object> getProperties()
-    {
-        return _properties;
-    }
-
-    @Override
-    public void setProperties(Map<Symbol, Object> properties)
-    {
-        _properties = properties;
-    }
-
-    @Override
-    public Map<Symbol, Object> getRemoteProperties()
-    {
-        return _remoteProperties;
-    }
-
-    void setRemoteProperties(Map<Symbol, Object> remoteProperties)
-    {
-        _remoteProperties = remoteProperties;
-    }
-
-    @Override
-    public Symbol[] getDesiredCapabilities()
-    {
-        return _desiredCapabilities;
-    }
-
-    @Override
-    public void setDesiredCapabilities(Symbol[] desiredCapabilities)
-    {
-        _desiredCapabilities = desiredCapabilities;
-    }
-
-    @Override
-    public Symbol[] getRemoteDesiredCapabilities()
-    {
-        return _remoteDesiredCapabilities;
-    }
-
-    void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
-    {
-        _remoteDesiredCapabilities = remoteDesiredCapabilities;
-    }
-
-    @Override
-    public Symbol[] getOfferedCapabilities()
-    {
-        return _offeredCapabilities;
-    }
-
-    @Override
-    public void setOfferedCapabilities(Symbol[] offeredCapabilities)
-    {
-        _offeredCapabilities = offeredCapabilities;
-    }
-
-    @Override
-    public Symbol[] getRemoteOfferedCapabilities()
-    {
-        return _remoteOfferedCapabilities;
-    }
-
-    void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
-    {
-        _remoteOfferedCapabilities = remoteOfferedCapabilities;
-    }
-
-    @Override
-    public int drained()
-    {
-        int drained = 0;
-
-        if (this instanceof SenderImpl) {
-            if(getDrain() && hasCredit())
-            {
-                _drained = getCredit();
-                setCredit(0);
-                modified();
-                drained = _drained;
-            }
-        } else {
-            drained = _drained;
-            _drained = 0;
-        }
-
-        return drained;
-    }
-
-    int getDrained()
-    {
-        return _drained;
-    }
-
-    void setDrained(int value)
-    {
-        _drained = value;
-    }
-
-    @Override
-    public DeliveryImpl head()
-    {
-        return _head;
-    }
-
-    @Override
-    void localOpen()
-    {
-        getConnectionImpl().put(Event.Type.LINK_LOCAL_OPEN, this);
-    }
-
-    @Override
-    void localClose()
-    {
-        getConnectionImpl().put(Event.Type.LINK_LOCAL_CLOSE, this);
-    }
-
-    @Override
-    public void detach()
-    {
-        _detached = true;
-        getConnectionImpl().put(Event.Type.LINK_LOCAL_DETACH, this);
-        modified();
-    }
-
-    public boolean detached()
-    {
-        return _detached;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
deleted file mode 100644
index e3a8f78..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkNode.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package org.apache.qpid.proton.engine.impl;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-
-class LinkNode<E>
-{
-    public interface Query<T>
-    {
-        public boolean matches(LinkNode<T> node);
-    }
-
-
-    private E _value;
-    private LinkNode<E> _prev;
-    private LinkNode<E> _next;
-
-    private LinkNode(E value)
-    {
-        _value = value;
-    }
-
-    public E getValue()
-    {
-        return _value;
-    }
-
-    public LinkNode<E> getPrev()
-    {
-        return _prev;
-    }
-
-    public LinkNode<E> getNext()
-    {
-        return _next;
-    }
-
-    public LinkNode<E> next(Query<E> query)
-    {
-        LinkNode<E> next = _next;
-        while(next != null && !query.matches(next))
-        {
-            next = next.getNext();
-        }
-        return next;
-    }
-
-    public LinkNode<E> remove()
-    {
-        LinkNode<E> prev = _prev;
-        LinkNode<E> next = _next;
-        if(prev != null)
-        {
-            prev._next = next;
-        }
-        if(next != null)
-        {
-            next._prev = prev;
-        }
-        _next = _prev = null;
-        return next;
-    }
-
-    public LinkNode<E> addAtTail(E value)
-    {
-        if(_next == null)
-        {
-            _next = new LinkNode<E>(value);
-            _next._prev = this;
-            return _next;
-        }
-        else
-        {
-            return _next.addAtTail(value);
-        }
-    }
-
-    public static <T> LinkNode<T> newList(T value)
-    {
-        return new LinkNode<T>(value);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
deleted file mode 100644
index 62b75b3..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/PlainTransportWrapper.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.engine.TransportException;
-
-public class PlainTransportWrapper implements TransportWrapper
-{
-    private final TransportOutput _outputProcessor;
-    private final TransportInput _inputProcessor;
-
-    public PlainTransportWrapper(TransportOutput outputProcessor,
-            TransportInput inputProcessor)
-    {
-        _outputProcessor = outputProcessor;
-        _inputProcessor = inputProcessor;
-    }
-
-    @Override
-    public int capacity()
-    {
-        return _inputProcessor.capacity();
-    }
-
-    @Override
-    public int position()
-    {
-        return _inputProcessor.position();
-    }
-
-    @Override
-    public ByteBuffer tail()
-    {
-        return _inputProcessor.tail();
-    }
-
-    @Override
-    public void process() throws TransportException
-    {
-        _inputProcessor.process();
-    }
-
-    @Override
-    public void close_tail()
-    {
-        _inputProcessor.close_tail();
-    }
-
-    @Override
-    public int pending()
-    {
-        return _outputProcessor.pending();
-    }
-
-    @Override
-    public ByteBuffer head()
-    {
-        return _outputProcessor.head();
-    }
-
-    @Override
-    public void pop(int bytes)
-    {
-        _outputProcessor.pop(bytes);
-    }
-
-    @Override
-    public void close_head()
-    {
-        _outputProcessor.close_head();
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
deleted file mode 100644
index 92ad598..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.framing.TransportFrame;
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-public interface ProtocolTracer
-{
-    public void receivedFrame(TransportFrame transportFrame);
-    public void sentFrame(TransportFrame transportFrame);
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
deleted file mode 100644
index 6f86700..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.engine.Receiver;
-
-public class ReceiverImpl extends LinkImpl implements Receiver
-{
-    private boolean _drainFlagMode = true;
-
-    @Override
-    public boolean advance()
-    {
-        DeliveryImpl current = current();
-        if(current != null)
-        {
-            current.setDone();
-        }
-        final boolean advance = super.advance();
-        if(advance)
-        {
-            decrementQueued();
-            decrementCredit();
-            getSession().incrementIncomingBytes(-current.pending());
-            getSession().incrementIncomingDeliveries(-1);
-            if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
-                modified();
-            }
-        }
-        return advance;
-    }
-
-    private TransportReceiver _transportReceiver;
-    private int _unsentCredits;
-
-
-    ReceiverImpl(SessionImpl session, String name)
-    {
-        super(session, name);
-    }
-
-    @Override
-    public void flow(final int credits)
-    {
-        addCredit(credits);
-        _unsentCredits += credits;
-        modified();
-        if (!_drainFlagMode)
-        {
-            setDrain(false);
-            _drainFlagMode = false;
-        }
-    }
-
-    int clearUnsentCredits()
-    {
-        int credits = _unsentCredits;
-        _unsentCredits = 0;
-        return credits;
-    }
-
-    @Override
-    public int recv(final byte[] bytes, int offset, int size)
-    {
-        if (_current == null) {
-            throw new IllegalStateException("no current delivery");
-        }
-
-        int consumed = _current.recv(bytes, offset, size);
-        if (consumed > 0) {
-            getSession().incrementIncomingBytes(-consumed);
-            if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
-                modified();
-            }
-        }
-        return consumed;
-    }
-
-    @Override
-    public int recv(final WritableBuffer buffer)
-    {
-        if (_current == null) {
-            throw new IllegalStateException("no current delivery");
-        }
-
-        int consumed = _current.recv(buffer);
-        if (consumed > 0) {
-            getSession().incrementIncomingBytes(-consumed);
-            if (getSession().getTransportSession().getIncomingWindowSize().equals(UnsignedInteger.ZERO)) {
-                modified();
-            }
-        }
-        return consumed;
-    }
-
-    @Override
-    void doFree()
-    {
-        getSession().freeReceiver(this);
-        super.doFree();
-    }
-
-    boolean hasIncoming()
-    {
-        return false;  //TODO - Implement
-    }
-
-    void setTransportLink(TransportReceiver transportReceiver)
-    {
-        _transportReceiver = transportReceiver;
-    }
-
-    @Override
-    TransportReceiver getTransportLink()
-    {
-        return _transportReceiver;
-    }
-
-    @Override
-    public void drain(int credit)
-    {
-        setDrain(true);
-        flow(credit);
-        _drainFlagMode = false;
-    }
-
-    @Override
-    public boolean draining()
-    {
-        return getDrain() && (getCredit() > getQueued());
-    }
-
-    @Override
-    public void setDrain(boolean drain)
-    {
-        super.setDrain(drain);
-        modified();
-        _drainFlagMode = true;
-    }
-
-    @Override
-    public int getRemoteCredit()
-    {
-        // Credit is only decremented once advance is called on a received message,
-        // so we also need to consider the queued count.
-        return getCredit() - getQueued();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
deleted file mode 100644
index 85408cb..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/RecordImpl.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.engine.Record;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * RecordImpl
- *
- */
-
-public class RecordImpl implements Record
-{
-
-    private Map<Object,Object> values = new HashMap<Object,Object>();
-
-    public <T> void set(Object key, Class<T> klass, T value) {
-        values.put(key, value);
-    }
-
-    public <T> T get(Object key, Class<T> klass) {
-        return klass.cast(values.get(key));
-    }
-
-    public void clear() {
-        values.clear();
-    }
-
-    void copy(RecordImpl src) {
-        values.putAll(src.values);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
deleted file mode 100644
index 01e3a35..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/Ref.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-
-/**
- * Ref
- *
- */
-
-class Ref<T>
-{
-
-    T value;
-
-    public Ref(T initial) {
-        value = initial;
-    }
-
-    public T get() {
-        return value;
-    }
-
-    public void set(T value) {
-        this.value = value;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
deleted file mode 100644
index efc12dd..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-
-/**
- * Used by {@link SaslFrameParser} to handle the frames it parses
- */
-interface SaslFrameHandler
-{
-    void handle(SaslFrameBody frameBody, Binary payload);
-
-    boolean isDone();
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
deleted file mode 100644
index 8becc72..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.proton.engine.impl;
-
-import java.nio.ByteBuffer;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-import org.apache.qpid.proton.codec.ByteBufferDecoder;
-import org.apache.qpid.proton.codec.DecodeException;
-import org.apache.qpid.proton.engine.TransportException;
-
-class SaslFrameParser
-{
-    private SaslFrameHandler _sasl;
-
-    enum State
-    {
-        SIZE_0,
-        SIZE_1,
-        SIZE_2,
-        SIZE_3,
-        PRE_PARSE,
-        BUFFERING,
-        PARSING,
-        ERROR
-    }
-
-    private State _state = State.SIZE_0;
-    private int _size;
-
-    private ByteBuffer _buffer;
-
-    private int _ignore = 8;
-    private final ByteBufferDecoder _decoder;
-
-
-    SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder)
-    {
-        _sasl = sasl;
-        _decoder = decoder;
-    }
-
-    /**
-     * Parse the provided SASL input and call my SASL frame handler with the result
-     */
-    public void input(ByteBuffer input) throws TransportException
-    {
-        TransportException frameParsingError = null;
-        int size = _size;
-        State state = _state;
-        ByteBuffer oldIn = null;
-
-        // Note that we simply skip over the header rather than parsing it.
-        if(_ignore != 0)
-        {
-            int bytesToEat = Math.min(_ignore, input.remaining());
-            input.position(input.position() + bytesToEat);
-            _ignore -= bytesToEat;
-        }
-
-        while(input.hasRemaining() && state != State.ERROR && !_sasl.isDone())
-        {
-            switch(state)
-            {
-                case SIZE_0:
-                    if(input.remaining() >= 4)
-                    {
-                        size = input.getInt();
-                        state = State.PRE_PARSE;
-                        break;
-                    }
-                    else
-                    {
-                        size = (input.get() << 24) & 0xFF000000;
-                        if(!input.hasRemaining())
-                        {
-                            state = State.SIZE_1;
-                            break;
-                        }
-                    }
-                case SIZE_1:
-                    size |= (input.get() << 16) & 0xFF0000;
-                    if(!input.hasRemaining())
-                    {
-                        state = State.SIZE_2;
-                        break;
-                    }
-                case SIZE_2:
-                    size |= (input.get() << 8) & 0xFF00;
-                    if(!input.hasRemaining())
-                    {
-                        state = State.SIZE_3;
-                        break;
-                    }
-                case SIZE_3:
-                    size |= input.get() & 0xFF;
-                    state = State.PRE_PARSE;
-
-                case PRE_PARSE:
-                    if(size < 8)
-                    {
-                        frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header "
-                                                                   + "size %d",
-                                                                   _size, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    if(input.remaining() < size-4)
-                    {
-                        _buffer = ByteBuffer.allocate(size-4);
-                        _buffer.put(input);
-                        state = State.BUFFERING;
-                        break;
-                    }
-                case BUFFERING:
-                    if(_buffer != null)
-                    {
-                        if(input.remaining() < _buffer.remaining())
-                        {
-                            _buffer.put(input);
-                            break;
-                        }
-                        else
-                        {
-                            ByteBuffer dup = input.duplicate();
-                            dup.limit(dup.position()+_buffer.remaining());
-                            input.position(input.position()+_buffer.remaining());
-                            _buffer.put(dup);
-                            oldIn = input;
-                            _buffer.flip();
-                            input = _buffer;
-                            state = State.PARSING;
-                        }
-                    }
-
-                case PARSING:
-
-                    int dataOffset = (input.get() << 2) & 0x3FF;
-
-                    if(dataOffset < 8)
-                    {
-                        frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
-                        state = State.ERROR;
-                        break;
-                    }
-                    else if(dataOffset > size)
-                    {
-                        frameParsingError = new TransportException("specified frame data offset %d larger than the frame size %d", dataOffset, _size);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    // type
-
-                    int type = input.get() & 0xFF;
-                    // SASL frame has no type-specific content in the frame header, so we skip next two bytes
-                    input.get();
-                    input.get();
-
-                    if(type != SaslImpl.SASL_FRAME_TYPE)
-                    {
-                        frameParsingError = new TransportException("unknown frame type: %d", type);
-                        state = State.ERROR;
-                        break;
-                    }
-
-                    if(dataOffset!=8)
-                    {
-                        input.position(input.position()+dataOffset-8);
-                    }
-
-                    // oldIn null iff not working on duplicated buffer
-                    if(oldIn == null)
-                    {
-                        oldIn = input;
-                        input = input.duplicate();
-                        final int endPos = input.position() + size - dataOffset;
-                        input.limit(endPos);
-                        oldIn.position(endPos);
-
-                    }
-
-                    try
-                    {
-                        _decoder.setByteBuffer(input);
-                        Object val = _decoder.readObject();
-
-                        Binary payload;
-
-                        if(input.hasRemaining())
-                        {
-                            byte[] payloadBytes = new byte[input.remaining()];
-                            input.get(payloadBytes);
-                            payload = new Binary(payloadBytes);
-                        }
-                        else
-                        {
-                            payload = null;
-                        }
-
-                        if(val instanceof SaslFrameBody)
-                        {
-                            SaslFrameBody frameBody = (SaslFrameBody) val;
-                            _sasl.handle(frameBody, payload);
-
-                            reset();
-                            input = oldIn;
-                            oldIn = null;
-                            _buffer = null;
-                            state = State.SIZE_0;
-                        }
-                        else
-                        {
-                            state = State.ERROR;
-                            frameParsingError = new TransportException("Unexpected frame type encountered."
-                                                                       + " Found a %s which does not implement %s",
-                                                                       val == null ? "null" : val.getClass(), SaslFrameBody.class);
-                        }
-                    }
-                    catch (DecodeException ex)
-                    {
-                        state = State.ERROR;
-                        frameParsingError = new TransportException(ex);
-                    }
-                    break;
-                case ERROR:
-                    // do nothing
-            }
-
-        }
-
-        _state = state;
-        _size = size;
-
-        if(_state == State.ERROR)
-        {
-            if(frameParsingError != null)
-            {
-                throw frameParsingError;
-            }
-            else
-            {
-                throw new TransportException("Unable to parse, probably because of a previous error");
-            }
-        }
-    }
-
-    private void reset()
-    {
-        _size = 0;
-        _state = State.SIZE_0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
deleted file mode 100644
index 56567fe..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ /dev/null
@@ -1,738 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-
-package org.apache.qpid.proton.engine.impl;
-
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourAll;
-import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray;
-
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.security.SaslChallenge;
-import org.apache.qpid.proton.amqp.security.SaslCode;
-import org.apache.qpid.proton.amqp.security.SaslFrameBody;
-import org.apache.qpid.proton.amqp.security.SaslInit;
-import org.apache.qpid.proton.amqp.security.SaslMechanisms;
-import org.apache.qpid.proton.amqp.security.SaslResponse;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.engine.TransportException;
-
-public class SaslImpl implements Sasl, SaslFrameBody.SaslFrameBodyHandler<Void>, SaslFrameHandler, TransportLayer
-{
-    private static final Logger _logger = Logger.getLogger(SaslImpl.class.getName());
-
-    public static final byte SASL_FRAME_TYPE = (byte) 1;
-
-    private final DecoderImpl _decoder = new DecoderImpl();
-    private final EncoderImpl _encoder = new EncoderImpl(_decoder);
-
-    private final TransportImpl _transport;
-
-    private boolean _tail_closed = false;
-    private final ByteBuffer _inputBuffer;
-    private boolean _head_closed = false;
-    private final ByteBuffer _outputBuffer;
-    private final FrameWriter _frameWriter;
-
-    private ByteBuffer _pending;
-
-    private boolean _headerWritten;
-    private Binary _challengeResponse;
-    private SaslFrameParser _frameParser;
-    private boolean _initReceived;
-    private boolean _mechanismsSent;
-    private boolean _initSent;
-
-    enum Role { CLIENT, SERVER };
-
-    private SaslOutcome _outcome = SaslOutcome.PN_SASL_NONE;
-    private SaslState _state = SaslState.PN_SASL_IDLE;
-
-    private String _hostname;
-    private boolean _done;
-    private Symbol[] _mechanisms;
-
-    private Symbol _chosenMechanism;
-
-    private Role _role;
-    private boolean _allowSkip = true;
-
-    /**
-     * @param maxFrameSize the size of the input and output buffers
-     * returned by {@link SaslTransportWrapper#getInputBuffer()} and
-     * {@link SaslTransportWrapper#getOutputBuffer()}.
-     */
-    SaslImpl(TransportImpl transport, int maxFrameSize)
-    {
-        _transport = transport;
-        _inputBuffer = newWriteableBuffer(maxFrameSize);
-        _outputBuffer = newWriteableBuffer(maxFrameSize);
-
-        AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
-        _frameParser = new SaslFrameParser(this, _decoder);
-        _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, null, _transport);
-    }
-
-    void fail() {
-        if (_role == null || _role == Role.CLIENT) {
-            _role = Role.CLIENT;
-            _initSent = true;
-        } else {
-            _initReceived = true;
-
-        }
-        _done = true;
-        _outcome = SaslOutcome.PN_SASL_SYS;
-    }
-
-    @Override
-    public boolean isDone()
-    {
-        return _done && (_role==Role.CLIENT || _initReceived);
-    }
-
-    private void writeSaslOutput()
-    {
-        process();
-        _frameWriter.readBytes(_outputBuffer);
-
-        if(_logger.isLoggable(Level.FINER))
-        {
-            _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
-        }
-    }
-
-    private void process()
-    {
-        processHeader();
-
-        if(_role == Role.SERVER)
-        {
-            if(!_mechanismsSent && _mechanisms != null)
-            {
-                SaslMechanisms mechanisms = new SaslMechanisms();
-
-                mechanisms.setSaslServerMechanisms(_mechanisms);
-                writeFrame(mechanisms);
-                _mechanismsSent = true;
-                _state = SaslState.PN_SASL_STEP;
-            }
-
-            if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() != null)
-            {
-                SaslChallenge challenge = new SaslChallenge();
-                challenge.setChallenge(getChallengeResponse());
-                writeFrame(challenge);
-                setChallengeResponse(null);
-            }
-
-            if(_done)
-            {
-                org.apache.qpid.proton.amqp.security.SaslOutcome outcome =
-                        new org.apache.qpid.proton.amqp.security.SaslOutcome();
-                outcome.setCode(SaslCode.values()[_outcome.getCode()]);
-                writeFrame(outcome);
-            }
-        }
-        else if(_role == Role.CLIENT)
-        {
-            if(getState() == SaslState.PN_SASL_IDLE && _chosenMechanism != null)
-            {
-                processInit();
-                _state = SaslState.PN_SASL_STEP;
-
-                //HACK: if we received an outcome before
-                //we sent our init, change the state now
-                if(_outcome != SaslOutcome.PN_SASL_NONE)
-                {
-                    _state = classifyStateFromOutcome(_outcome);
-                }
-            }
-
-            if(getState() == SaslState.PN_SASL_STEP && getChallengeResponse() != null)
-            {
-                processResponse();
-            }
-        }
-    }
-
-    private void writeFrame(SaslFrameBody frameBody)
-    {
-        _frameWriter.writeFrame(frameBody);
-    }
-
-    @Override
-    final public int recv(byte[] bytes, int offset, int size)
-    {
-        if(_pending == null)
-        {
-            return -1;
-        }
-        final int written = pourBufferToArray(_pending, bytes, offset, size);
-        if(!_pending.hasRemaining())
-        {
-            _pending = null;
-        }
-        return written;
-    }
-
-    @Override
-    final public int send(byte[] bytes, int offset, int size)
-    {
-        byte[] data = new byte[size];
-        System.arraycopy(bytes, offset, data, 0, size);
-        setChallengeResponse(new Binary(data));
-        return size;
-    }
-
-    final int processHeader()
-    {
-        if(!_headerWritten)
-        {
-            _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
-
-            _headerWritten = true;
-            return AmqpHeader.SASL_HEADER.length;
-        }
-        else
-        {
-            return 0;
-        }
-    }
-
-    @Override
-    public int pending()
-    {
-        return _pending == null ? 0 : _pending.remaining();
-    }
-
-    void setPending(ByteBuffer pending)
-    {
-        _pending = pending;
-    }
-
-    @Override
-    public SaslState getState()
-    {
-        return _state;
-    }
-
-    final Binary getChallengeResponse()
-    {
-        return _challengeResponse;
-    }
-
-    final void setChallengeResponse(Binary challengeResponse)
-    {
-        _challengeResponse = challengeResponse;
-    }
-
-    @Override
-    public void setMechanisms(String... mechanisms)
-    {
-        if(mechanisms != null)
-        {
-            _mechanisms = new Symbol[mechanisms.length];
-            for(int i = 0; i < mechanisms.length; i++)
-            {
-                _mechanisms[i] = Symbol.valueOf(mechanisms[i]);
-            }
-        }
-
-        if(_role == Role.CLIENT)
-        {
-            assert mechanisms != null;
-            assert mechanisms.length == 1;
-
-            _chosenMechanism = Symbol.valueOf(mechanisms[0]);
-        }
-    }
-
-    @Override
-    public String[] getRemoteMechanisms()
-    {
-        if(_role == Role.SERVER)
-        {
-            return _chosenMechanism == null ? new String[0] : new String[] { _chosenMechanism.toString() };
-        }
-        else if(_role == Role.CLIENT)
-        {
-            if(_mechanisms == null)
-            {
-                return new String[0];
-            }
-            else
-            {
-                String[] remoteMechanisms = new String[_mechanisms.length];
-                for(int i = 0; i < _mechanisms.length; i++)
-                {
-                    remoteMechanisms[i] = _mechanisms[i].toString();
-                }
-                return remoteMechanisms;
-            }
-        }
-        else
-        {
-            throw new IllegalStateException();
-        }
-    }
-
-    public void setMechanism(Symbol mechanism)
-    {
-        _chosenMechanism = mechanism;
-    }
-
-    public Symbol getChosenMechanism()
-    {
-        return _chosenMechanism;
-    }
-
-    public void setResponse(Binary initialResponse)
-    {
-        setPending(initialResponse.asByteBuffer());
-    }
-
-    @Override
-    public void handle(SaslFrameBody frameBody, Binary payload)
-    {
-        frameBody.invoke(this, payload, null);
-    }
-
-    @Override
-    public void handleInit(SaslInit saslInit, Binary payload, Void context)
-    {
-        if(_role == null)
-        {
-            server();
-        }
-        checkRole(Role.SERVER);
-        _hostname = saslInit.getHostname();
-        _chosenMechanism = saslInit.getMechanism();
-        _initReceived = true;
-        if(saslInit.getInitialResponse() != null)
-        {
-            setPending(saslInit.getInitialResponse().asByteBuffer());
-        }
-    }
-
-    @Override
-    public void handleResponse(SaslResponse saslResponse, Binary payload, Void context)
-    {
-        checkRole(Role.SERVER);
-        setPending(saslResponse.getResponse()  == null ? null : saslResponse.getResponse().asByteBuffer());
-    }
-
-    @Override
-    public void done(SaslOutcome outcome)
-    {
-        checkRole(Role.SERVER);
-        _outcome = outcome;
-        _done = true;
-        _state = classifyStateFromOutcome(outcome);
-        _logger.fine("SASL negotiation done: " + this);
-    }
-
-    private void checkRole(Role role)
-    {
-        if(role != _role)
-        {
-            throw new IllegalStateException("Role is " + _role + " but should be " + role);
-        }
-    }
-
-    @Override
-    public void handleMechanisms(SaslMechanisms saslMechanisms, Binary payload, Void context)
-    {
-        if(_role == null)
-        {
-            client();
-        }
-        checkRole(Role.CLIENT);
-        _mechanisms = saslMechanisms.getSaslServerMechanisms();
-    }
-
-    @Override
-    public void handleChallenge(SaslChallenge saslChallenge, Binary payload, Void context)
-    {
-        checkRole(Role.CLIENT);
-        setPending(saslChallenge.getChallenge()  == null ? null : saslChallenge.getChallenge().asByteBuffer());
-    }
-
-    @Override
-    public void handleOutcome(org.apache.qpid.proton.amqp.security.SaslOutcome saslOutcome,
-                              Binary payload,
-                              Void context)
-    {
-        checkRole(Role.CLIENT);
-        for(SaslOutcome outcome : SaslOutcome.values())
-        {
-            if(outcome.getCode() == saslOutcome.getCode().ordinal())
-            {
-                _outcome = outcome;
-                if (_state != SaslState.PN_SASL_IDLE)
-                {
-                    _state = classifyStateFromOutcome(outcome);
-                }
-                break;
-            }
-        }
-        _done = true;
-
-        if(_logger.isLoggable(Level.FINE))
-        {
-            _logger.fine("Handled outcome: " + this);
-        }
-    }
-
-    private SaslState classifyStateFromOutcome(SaslOutcome outcome)
-    {
-        return outcome == SaslOutcome.PN_SASL_OK ? SaslState.PN_SASL_PASS : SaslState.PN_SASL_FAIL;
-    }
-
-    private void processResponse()
-    {
-        SaslResponse response = new SaslResponse();
-        response.setResponse(getChallengeResponse());
-        setChallengeResponse(null);
-        writeFrame(response);
-    }
-
-    private void processInit()
-    {
-        SaslInit init = new SaslInit();
-        init.setHostname(_hostname);
-        init.setMechanism(_chosenMechanism);
-        if(getChallengeResponse() != null)
-        {
-            init.setInitialResponse(getChallengeResponse());
-            setChallengeResponse(null);
-        }
-        _initSent = true;
-        writeFrame(init);
-    }
-
-    @Override
-    public void plain(String username, String password)
-    {
-        client();
-        _chosenMechanism = Symbol.valueOf("PLAIN");
-        byte[] usernameBytes = username.getBytes();
-        byte[] passwordBytes = password.getBytes();
-        byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
-        System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
-        System.arraycopy(passwordBytes, 0, data, 2+usernameBytes.length, passwordBytes.length);
-
-        setChallengeResponse(new Binary(data));
-    }
-
-    @Override
-    public SaslOutcome getOutcome()
-    {
-        return _outcome;
-    }
-
-    @Override
-    public void client()
-    {
-        _role = Role.CLIENT;
-        if(_mechanisms != null)
-        {
-            assert _mechanisms.length == 1;
-
-            _chosenMechanism = _mechanisms[0];
-        }
-    }
-
-    @Override
-    public void server()
-    {
-        _role = Role.SERVER;
-    }
-
-    @Override
-    public void allowSkip(boolean allowSkip)
-    {
-        _allowSkip = allowSkip;
-    }
-
-    public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
-    {
-        return new SaslSniffer(new SaslTransportWrapper(input, output),
-                               new PlainTransportWrapper(output, input)) {
-            protected boolean isDeterminationMade() {
-                if (_role == Role.SERVER && _allowSkip) {
-                    return super.isDeterminationMade();
-                } else {
-                    _selectedTransportWrapper = _wrapper1;
-                    return true;
-                }
-            }
-        };
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder builder = new StringBuilder();
-        builder
-            .append("SaslImpl [_outcome=").append(_outcome)
-            .append(", state=").append(_state)
-            .append(", done=").append(_done)
-            .append(", role=").append(_role)
-            .append("]");
-        return builder.toString();
-    }
-
-    private class SaslTransportWrapper implements TransportWrapper
-    {
-        private final TransportInput _underlyingInput;
-        private final TransportOutput _underlyingOutput;
-        private boolean _outputComplete;
-        private final ByteBuffer _head;
-
-        private SaslTransportWrapper(TransportInput input, TransportOutput output)
-        {
-            _underlyingInput = input;
-            _underlyingOutput = output;
-            _head = _outputBuffer.asReadOnlyBuffer();
-            _head.limit(0);
-        }
-
-        private void fillOutputBuffer()
-        {
-            if(isOutputInSaslMode())
-            {
-                SaslImpl.this.writeSaslOutput();
-                if(_done)
-                {
-                    _outputComplete = true;
-                }
-            }
-        }
-
-        /**
-         * TODO rationalise this method with respect to the other similar checks of _role/_initReceived etc
-         * @see SaslImpl#isDone()
-         */
-        private boolean isInputInSaslMode()
-        {
-            return _role == null || (_role == Role.CLIENT && !_done) ||(_role == Role.SERVER && (!_initReceived || !_done));
-        }
-
-        private boolean isOutputInSaslMode()
-        {
-            return _role == null || (_role == Role.CLIENT && (!_done || !_initSent)) || (_role == Role.SERVER && !_outputComplete);
-        }
-
-        @Override
-        public int capacity()
-        {
-            if (_tail_closed) return Transport.END_OF_STREAM;
-            if (isInputInSaslMode())
-            {
-                return _inputBuffer.remaining();
-            }
-            else
-            {
-                return _underlyingInput.capacity();
-            }
-        }
-
-        @Override
-        public int position()
-        {
-            if (_tail_closed) return Transport.END_OF_STREAM;
-            if (isInputInSaslMode())
-            {
-                return _inputBuffer.position();
-            }
-            else
-            {
-                return _underlyingInput.position();
-            }
-        }
-
-        @Override
-        public ByteBuffer tail()
-        {
-            if (!isInputInSaslMode())
-            {
-                return _underlyingInput.tail();
-            }
-
-            return _inputBuffer;
-        }
-
-        @Override
-        public void process() throws TransportException
-        {
-            _inputBuffer.flip();
-
-            try
-            {
-                reallyProcessInput();
-            }
-            finally
-            {
-                _inputBuffer.compact();
-            }
-        }
-
-        @Override
-        public void close_tail()
-        {
-            _tail_closed = true;
-            if (isInputInSaslMode()) {
-                _head_closed = true;
-                _underlyingInput.close_tail();
-            } else {
-                _underlyingInput.close_tail();
-            }
-        }
-
-        private void reallyProcessInput() throws TransportException
-        {
-            if(isInputInSaslMode())
-            {
-                if(_logger.isLoggable(Level.FINER))
-                {
-                    _logger.log(Level.FINER, SaslImpl.this + " about to call input.");
-                }
-
-                _frameParser.input(_inputBuffer);
-            }
-
-            if(!isInputInSaslMode())
-            {
-                if(_logger.isLoggable(Level.FINER))
-                {
-                    _logger.log(Level.FINER, SaslImpl.this + " about to call plain input");
-                }
-
-                if (_inputBuffer.hasRemaining())
-                {
-                    int bytes = pourAll(_inputBuffer, _underlyingInput);
-                    if (bytes == Transport.END_OF_STREAM)
-                    {
-                        _tail_closed = true;
-                    }
-
-                    _underlyingInput.process();
-                }
-                else
-                {
-                    _underlyingInput.process();
-                }
-            }
-        }
-
-        @Override
-        public int pending()
-        {
-            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
-            {
-                fillOutputBuffer();
-                _head.limit(_outputBuffer.position());
-
-                if (_head_closed && _outputBuffer.position() == 0)
-                {
-                    return Transport.END_OF_STREAM;
-                }
-                else
-                {
-                    return _outputBuffer.position();
-                }
-            }
-            else
-            {
-                return _underlyingOutput.pending();
-            }
-        }
-
-        @Override
-        public ByteBuffer head()
-        {
-            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
-            {
-                pending();
-                return _head;
-            }
-            else
-            {
-                return _underlyingOutput.head();
-            }
-        }
-
-        @Override
-        public void pop(int bytes)
-        {
-            if (isOutputInSaslMode() || _outputBuffer.position() != 0)
-            {
-                _outputBuffer.flip();
-                _outputBuffer.position(bytes);
-                _outputBuffer.compact();
-                _head.position(0);
-                _head.limit(_outputBuffer.position());
-            }
-            else
-            {
-                _underlyingOutput.pop(bytes);
-            }
-        }
-
-        @Override
-        public void close_head()
-        {
-            _underlyingOutput.close_head();
-        }
-    }
-
-    @Override
-    public String getHostname()
-    {
-        if(_role != null)
-        {
-            checkRole(Role.SERVER);
-        }
-
-        return _hostname;
-    }
-
-    @Override
-    public void setRemoteHostname(String hostname)
-    {
-        if(_role != null)
-        {
-            checkRole(Role.CLIENT);
-        }
-
-        _hostname = hostname;
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
deleted file mode 100644
index 2d92496..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslSniffer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-
-/**
- * SaslSniffer
- *
- */
-
-class SaslSniffer extends HandshakeSniffingTransportWrapper<TransportWrapper, TransportWrapper>
-{
-
-    SaslSniffer(TransportWrapper sasl, TransportWrapper other) {
-        super(sasl, other);
-    }
-
-    protected int bufferSize() { return AmqpHeader.SASL_HEADER.length; }
-
-    protected void makeDetermination(byte[] bytes) {
-        if (bytes.length < bufferSize()) {
-            throw new IllegalArgumentException("insufficient bytes");
-        }
-
-        for (int i = 0; i < AmqpHeader.SASL_HEADER.length; i++) {
-            if (bytes[i] != AmqpHeader.SASL_HEADER[i]) {
-                _selectedTransportWrapper = _wrapper2;
-                return;
-            }
-        }
-
-        _selectedTransportWrapper = _wrapper1;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
deleted file mode 100644
index f418655..0000000
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.proton.engine.impl;
-
-import org.apache.qpid.proton.codec.ReadableBuffer;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Sender;
-
-public class SenderImpl  extends LinkImpl implements Sender
-{
-    private int _offered;
-    private TransportSender _transportLink;
-
-    SenderImpl(SessionImpl session, String name)
-    {
-        super(session, name);
-    }
-
-    @Override
-    public void offer(final int credits)
-    {
-        _offered = credits;
-    }
-
-    @Override
-    public int send(final byte[] bytes, int offset, int length)
-    {
-        if (getLocalState() == EndpointState.CLOSED)
-        {
-            throw new IllegalStateException("send not allowed after the sender is closed.");
-        }
-        DeliveryImpl current = current();
-        if (current == null || current.getLink() != this)
-        {
-            throw new IllegalArgumentException();//TODO.
-        }
-        int sent = current.send(bytes, offset, length);
-        if (sent > 0) {
-            getSession().incrementOutgoingBytes(sent);
-        }
-        return sent;
-    }
-
-    @Override
-    public int send(final ReadableBuffer buffer)
-    {
-        if (getLocalState() == EndpointState.CLOSED)
-        {
-            throw new IllegalStateException("send not allowed after the sender is closed.");
-        }
-        DeliveryImpl current = current();
-        if (current == null || current.getLink() != this)
-        {
-            throw new IllegalArgumentException();
-        }
-        int sent = current.send(buffer);
-        if (sent > 0) {
-            getSession().incrementOutgoingBytes(sent);
-        }
-        return sent;
-    }
-
-    @Override
-    public void abort()
-    {
-        //TODO.
-    }
-
-    @Override
-    void doFree()
-    {
-        getSession().freeSender(this);
-        super.doFree();
-    }
-
-    @Override
-    public boolean advance()
-    {
-        DeliveryImpl delivery = current();
-        if (delivery != null) {
-            delivery.setComplete();
-        }
-
-        boolean advance = super.advance();
-        if(advance && _offered > 0)
-        {
-            _offered--;
-        }
-        if(advance)
-        {
-            decrementCredit();
-            delivery.addToTransportWorkList();
-            getSession().incrementOutgoingDeliveries(1);
-        }
-
-        return advance;
-    }
-
-    boolean hasOfferedCredits()
-    {
-        return _offered > 0;
-    }
-
-    @Override
-    TransportSender getTransportLink()
-    {
-        return _transportLink;
-    }
-
-    void setTransportLink(TransportSender transportLink)
-    {
-        _transportLink = transportLink;
-    }
-
-
-    @Override
-    public void setCredit(int credit)
-    {
-        super.setCredit(credit);
-       /* while(getQueued()>0 && getCredit()>0)
-        {
-            advance();
-        }*/
-    }
-
-    @Override
-    public int getRemoteCredit()
-    {
-        // Credit is decremented as soon as advance is called on a send,
-        // so we need only consider the credit count, not the queued count.
-        return getCredit();
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org